Php Client support for compression attribute; patched by AaronR; KAFKA-159

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1185774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2011-10-18 17:57:29 +00:00
parent e39c11abbc
commit c2dc9c0841
3 changed files with 26 additions and 19 deletions

View File

@ -27,11 +27,12 @@ class Kafka_Encoder
*
* @var integer
*/
const CURRENT_MAGIC_VALUE = 0;
const CURRENT_MAGIC_VALUE = 1;
/**
* Encode a message. The format of an N byte message is the following:
* - 1 byte: "magic" identifier to allow format changes
* - 1 byte: "compression-attributes" for compression alogrithm
* - 4 bytes: CRC32 of the payload
* - (N - 5) bytes: payload
*
@ -39,9 +40,9 @@ class Kafka_Encoder
*
* @return string
*/
static public function encode_message($msg) {
// <MAGIC_BYTE: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
return pack('CN', self::CURRENT_MAGIC_VALUE, crc32($msg))
static public function encode_message($msg, $compression) {
// <MAGIC_BYTE: 1 byte> <COMPRESSION: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg))
. $msg;
}
@ -51,14 +52,15 @@ class Kafka_Encoder
* @param string $topic Topic
* @param integer $partition Partition number
* @param array $messages Array of messages to send
* @param compression $compression flag for type of compression
*
* @return string
*/
static public function encode_produce_request($topic, $partition, array $messages) {
static public function encode_produce_request($topic, $partition, array $messages, $compression) {
// encode messages as <LEN: int><MESSAGE_BYTES>
$message_set = '';
foreach ($messages as $message) {
$encoded = self::encode_message($message);
$encoded = self::encode_message($message, $compression);
$message_set .= pack('N', strlen($encoded)) . $encoded;
}
// create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>

View File

@ -14,6 +14,7 @@
/**
* A message. The format of an N byte message is the following:
* 1 byte "magic" identifier to allow format changes
* 1 byte compression-attribute
* 4 byte CRC32 of the payload
* N - 5 byte payload
*
@ -25,15 +26,6 @@
*/
class Kafka_Message
{
/*
private $currentMagicValue = Kafka_Encoder::CURRENT_MAGIC_VALUE;
private $magicOffset = 0;
private $magicLength = 1;
private $crcOffset = 1; // MagicOffset + MagicLength
private $crcLength = 4;
private $payloadOffset = 5; // CrcOffset + CrcLength
private $headerSize = 5; // PayloadOffset
*/
/**
* @var string
@ -45,6 +37,11 @@ class Kafka_Message
*/
private $size = 0;
/**
* @var integer
*/
private $compression = 0;
/**
* @var string
*/
@ -56,11 +53,13 @@ class Kafka_Message
* @param string $data Message payload
*/
public function __construct($data) {
$this->payload = substr($data, 5);
$this->payload = substr($data, 6);
$this->compression = substr($data,1,1);
$this->crc = crc32($this->payload);
$this->size = strlen($this->payload);
}
/**
* Encode a message
*
@ -121,7 +120,7 @@ class Kafka_Message
* @return string
*/
public function __toString() {
return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', crc = ' . $this->crc .
', payload = ' . $this->payload . ')';
return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression .
', crc = ' . $this->crc . ', payload = ' . $this->payload . ')';
}
}

View File

@ -42,6 +42,11 @@ class Kafka_Producer
*/
protected $port;
/**
* @var integer
*/
protected $compression;
/**
* Constructor
*
@ -52,6 +57,7 @@ class Kafka_Producer
$this->request_key = 0;
$this->host = $host;
$this->port = $port;
$this->compression = 0;
}
/**
@ -91,7 +97,7 @@ class Kafka_Producer
*/
public function send(array $messages, $topic, $partition = 0xFFFFFFFF) {
$this->connect();
return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages));
return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression));
}
/**