diff --git a/clients/php/src/lib/Kafka/Encoder.php b/clients/php/src/lib/Kafka/Encoder.php index bf344193648..3c05cfd8a98 100644 --- a/clients/php/src/lib/Kafka/Encoder.php +++ b/clients/php/src/lib/Kafka/Encoder.php @@ -27,11 +27,12 @@ class Kafka_Encoder * * @var integer */ - const CURRENT_MAGIC_VALUE = 0; + const CURRENT_MAGIC_VALUE = 1; /** * Encode a message. The format of an N byte message is the following: * - 1 byte: "magic" identifier to allow format changes + * - 1 byte: "compression-attributes" for compression alogrithm * - 4 bytes: CRC32 of the payload * - (N - 5) bytes: payload * @@ -39,9 +40,9 @@ class Kafka_Encoder * * @return string */ - static public function encode_message($msg) { - // - return pack('CN', self::CURRENT_MAGIC_VALUE, crc32($msg)) + static public function encode_message($msg, $compression) { + // + return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg)) . $msg; } @@ -51,14 +52,15 @@ class Kafka_Encoder * @param string $topic Topic * @param integer $partition Partition number * @param array $messages Array of messages to send + * @param compression $compression flag for type of compression * * @return string */ - static public function encode_produce_request($topic, $partition, array $messages) { + static public function encode_produce_request($topic, $partition, array $messages, $compression) { // encode messages as $message_set = ''; foreach ($messages as $message) { - $encoded = self::encode_message($message); + $encoded = self::encode_message($message, $compression); $message_set .= pack('N', strlen($encoded)) . $encoded; } // create the request as diff --git a/clients/php/src/lib/Kafka/Message.php b/clients/php/src/lib/Kafka/Message.php index 555043112f2..d728634af4c 100644 --- a/clients/php/src/lib/Kafka/Message.php +++ b/clients/php/src/lib/Kafka/Message.php @@ -14,6 +14,7 @@ /** * A message. The format of an N byte message is the following: * 1 byte "magic" identifier to allow format changes + * 1 byte compression-attribute * 4 byte CRC32 of the payload * N - 5 byte payload * @@ -25,15 +26,6 @@ */ class Kafka_Message { - /* - private $currentMagicValue = Kafka_Encoder::CURRENT_MAGIC_VALUE; - private $magicOffset = 0; - private $magicLength = 1; - private $crcOffset = 1; // MagicOffset + MagicLength - private $crcLength = 4; - private $payloadOffset = 5; // CrcOffset + CrcLength - private $headerSize = 5; // PayloadOffset - */ /** * @var string @@ -45,6 +37,11 @@ class Kafka_Message */ private $size = 0; + /** + * @var integer + */ + private $compression = 0; + /** * @var string */ @@ -56,10 +53,12 @@ class Kafka_Message * @param string $data Message payload */ public function __construct($data) { - $this->payload = substr($data, 5); + $this->payload = substr($data, 6); + $this->compression = substr($data,1,1); $this->crc = crc32($this->payload); $this->size = strlen($this->payload); } + /** * Encode a message @@ -121,7 +120,7 @@ class Kafka_Message * @return string */ public function __toString() { - return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', crc = ' . $this->crc . - ', payload = ' . $this->payload . ')'; + return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression . + ', crc = ' . $this->crc . ', payload = ' . $this->payload . ')'; } } diff --git a/clients/php/src/lib/Kafka/Producer.php b/clients/php/src/lib/Kafka/Producer.php index 1b7f056923a..56a7bd8421e 100644 --- a/clients/php/src/lib/Kafka/Producer.php +++ b/clients/php/src/lib/Kafka/Producer.php @@ -42,6 +42,11 @@ class Kafka_Producer */ protected $port; + /** + * @var integer + */ + protected $compression; + /** * Constructor * @@ -52,6 +57,7 @@ class Kafka_Producer $this->request_key = 0; $this->host = $host; $this->port = $port; + $this->compression = 0; } /** @@ -91,7 +97,7 @@ class Kafka_Producer */ public function send(array $messages, $topic, $partition = 0xFFFFFFFF) { $this->connect(); - return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages)); + return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression)); } /**