mirror of https://github.com/apache/kafka.git
KAFKA 158 Support for compression in go clients; patched by jeffregydamick; reviewed by nehanarkhede
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1189773 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d9a8af7782
commit
14a1f0dcd6
|
|
@ -6,6 +6,7 @@ GOFILES=\
|
||||||
src/message.go\
|
src/message.go\
|
||||||
src/converts.go\
|
src/converts.go\
|
||||||
src/consumer.go\
|
src/consumer.go\
|
||||||
|
src/payload_codec.go\
|
||||||
src/publisher.go\
|
src/publisher.go\
|
||||||
src/timing.go\
|
src/timing.go\
|
||||||
src/request.go\
|
src/request.go\
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,16 @@
|
||||||
# Kafka.go - Publisher & Consumer for Kafka in Go #
|
# Kafka.go - Publisher & Consumer for Kafka in Go #
|
||||||
|
|
||||||
Kafka is a distributed publish-subscribe messaging system: (http://sna-projects.com/kafka/)
|
Kafka is a distributed publish-subscribe messaging system: (http://incubator.apache.org/kafka/)
|
||||||
|
|
||||||
Go language: (http://golang.org/) <br/>
|
Go language: (http://golang.org/) <br/>
|
||||||
|
|
||||||
## Get up and running ##
|
## Get up and running ##
|
||||||
|
|
||||||
|
Install go: <br/>
|
||||||
|
For more info see: http://golang.org/doc/install.html#install
|
||||||
|
|
||||||
|
Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment).
|
||||||
|
|
||||||
Install kafka.go package: <br/>
|
Install kafka.go package: <br/>
|
||||||
<code>make install</code>
|
<code>make install</code>
|
||||||
<br/>
|
<br/>
|
||||||
|
|
@ -13,7 +18,7 @@ Make the tools (publisher & consumer) <br/>
|
||||||
<code>make tools</code>
|
<code>make tools</code>
|
||||||
<br/>
|
<br/>
|
||||||
Start zookeeper, Kafka server <br/>
|
Start zookeeper, Kafka server <br/>
|
||||||
For more info on Kafka, see: http://sna-projects.com/kafka/quickstart.php
|
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -48,6 +53,17 @@ broker.Publish(kafka.NewMessage([]byte("tesing 1 2 3")))
|
||||||
|
|
||||||
</code></pre>
|
</code></pre>
|
||||||
|
|
||||||
|
|
||||||
|
### Publishing Compressed Messages ###
|
||||||
|
|
||||||
|
<pre><code>
|
||||||
|
|
||||||
|
broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
|
||||||
|
broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))
|
||||||
|
|
||||||
|
</code></pre>
|
||||||
|
|
||||||
|
|
||||||
### Consumer ###
|
### Consumer ###
|
||||||
|
|
||||||
<pre><code>
|
<pre><code>
|
||||||
|
|
|
||||||
|
|
@ -20,20 +20,19 @@
|
||||||
* of their respective owners.
|
* of their respective owners.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
//"fmt"
|
//"fmt"
|
||||||
"bytes"
|
"bytes"
|
||||||
"container/list"
|
"compress/gzip"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMessageCreation(t *testing.T) {
|
func TestMessageCreation(t *testing.T) {
|
||||||
payload := []byte("testing")
|
payload := []byte("testing")
|
||||||
msg := NewMessage(payload)
|
msg := NewMessage(payload)
|
||||||
if msg.magic != 0 {
|
if msg.magic != 1 {
|
||||||
t.Errorf("magic incorrect")
|
t.Errorf("magic incorrect")
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
|
@ -45,31 +44,183 @@ func TestMessageCreation(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMagic0MessageEncoding(t *testing.T) {
|
||||||
|
// generated by kafka-rb:
|
||||||
|
// test the old message format
|
||||||
|
expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
|
||||||
|
length, msgsDecoded := Decode(expected, DefaultCodecsMap)
|
||||||
|
|
||||||
|
if length == 0 || msgsDecoded == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
msgDecoded := msgsDecoded[0]
|
||||||
|
|
||||||
|
payload := []byte("testing")
|
||||||
|
if !bytes.Equal(payload, msgDecoded.payload) {
|
||||||
|
t.Fatal("bytes not equal")
|
||||||
|
}
|
||||||
|
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
|
||||||
|
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
|
||||||
|
t.Fatal("checksums do not match")
|
||||||
|
}
|
||||||
|
if msgDecoded.magic != 0 {
|
||||||
|
t.Fatal("magic incorrect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMessageEncoding(t *testing.T) {
|
func TestMessageEncoding(t *testing.T) {
|
||||||
|
|
||||||
payload := []byte("testing")
|
payload := []byte("testing")
|
||||||
msg := NewMessage(payload)
|
msg := NewMessage(payload)
|
||||||
|
|
||||||
// generated by kafka-rb:
|
// generated by kafka-rb:
|
||||||
expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
|
expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
|
||||||
if !bytes.Equal(expected, msg.Encode()) {
|
if !bytes.Equal(expected, msg.Encode()) {
|
||||||
t.Fail()
|
t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify round trip
|
// verify round trip
|
||||||
msgDecoded := Decode(msg.Encode())
|
length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
|
||||||
if !bytes.Equal(msgDecoded.payload, payload) {
|
|
||||||
t.Fail()
|
if length == 0 || msgsDecoded == nil {
|
||||||
|
t.Fatal("message is nil")
|
||||||
}
|
}
|
||||||
|
msgDecoded := msgsDecoded[0]
|
||||||
|
|
||||||
if !bytes.Equal(msgDecoded.payload, payload) {
|
if !bytes.Equal(msgDecoded.payload, payload) {
|
||||||
t.Fail()
|
t.Fatal("bytes not equal")
|
||||||
}
|
}
|
||||||
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
|
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
|
||||||
if !bytes.Equal(msgDecoded.checksum[:], chksum) {
|
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
|
||||||
t.Fail()
|
t.Fatal("checksums do not match")
|
||||||
}
|
}
|
||||||
if msgDecoded.magic != 0 {
|
if msgDecoded.magic != 1 {
|
||||||
t.Fail()
|
t.Fatal("magic incorrect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompressedMessageEncodingCompare(t *testing.T) {
|
||||||
|
payload := []byte("testing")
|
||||||
|
uncompressedMsgBytes := NewMessage(payload).Encode()
|
||||||
|
|
||||||
|
msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode()
|
||||||
|
msgDefaultBytes := NewCompressedMessage(payload).Encode()
|
||||||
|
if !bytes.Equal(msgDefaultBytes, msgGzipBytes) {
|
||||||
|
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompressedMessageEncoding(t *testing.T) {
|
||||||
|
payload := []byte("testing")
|
||||||
|
uncompressedMsgBytes := NewMessage(payload).Encode()
|
||||||
|
|
||||||
|
msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
|
||||||
|
|
||||||
|
expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04,
|
||||||
|
0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A,
|
||||||
|
0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00,
|
||||||
|
0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00}
|
||||||
|
|
||||||
|
expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76}
|
||||||
|
|
||||||
|
expected := make([]byte, len(expectedHeader)+len(expectedPayload))
|
||||||
|
n := copy(expected, expectedHeader)
|
||||||
|
copy(expected[n:], expectedPayload)
|
||||||
|
|
||||||
|
if msg.compression != 1 {
|
||||||
|
t.Fatalf("expected compression: 1 but got: %b", msg.compression)
|
||||||
|
}
|
||||||
|
|
||||||
|
zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
|
||||||
|
uncompressed := make([]byte, 100)
|
||||||
|
n, _ = zipper.Read(uncompressed)
|
||||||
|
uncompressed = uncompressed[:n]
|
||||||
|
zipper.Close()
|
||||||
|
|
||||||
|
if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
|
||||||
|
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(expected, msg.Encode()) {
|
||||||
|
t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify round trip
|
||||||
|
length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
|
||||||
|
|
||||||
|
if length == 0 || msgsDecoded == nil {
|
||||||
|
t.Fatal("message is nil")
|
||||||
|
}
|
||||||
|
msgDecoded := msgsDecoded[0]
|
||||||
|
|
||||||
|
if !bytes.Equal(msgDecoded.payload, payload) {
|
||||||
|
t.Fatal("bytes not equal")
|
||||||
|
}
|
||||||
|
chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
|
||||||
|
if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
|
||||||
|
t.Fatalf("checksums do not match, expected: % X but was: % X",
|
||||||
|
chksum, msgDecoded.checksum[:])
|
||||||
|
}
|
||||||
|
if msgDecoded.magic != 1 {
|
||||||
|
t.Fatal("magic incorrect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLongCompressedMessageRoundTrip(t *testing.T) {
|
||||||
|
payloadBuf := bytes.NewBuffer([]byte{})
|
||||||
|
// make the test bigger than buffer allocated in the Decode
|
||||||
|
for i := 0; i < 15; i++ {
|
||||||
|
payloadBuf.Write([]byte("testing123 "))
|
||||||
|
}
|
||||||
|
|
||||||
|
uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode()
|
||||||
|
msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
|
||||||
|
|
||||||
|
zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
|
||||||
|
uncompressed := make([]byte, 200)
|
||||||
|
n, _ := zipper.Read(uncompressed)
|
||||||
|
uncompressed = uncompressed[:n]
|
||||||
|
zipper.Close()
|
||||||
|
|
||||||
|
if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
|
||||||
|
t.Fatalf("uncompressed: % X \npayload: % X bytes not equal",
|
||||||
|
uncompressed, uncompressedMsgBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify round trip
|
||||||
|
length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
|
||||||
|
|
||||||
|
if length == 0 || msgsDecoded == nil {
|
||||||
|
t.Fatal("message is nil")
|
||||||
|
}
|
||||||
|
msgDecoded := msgsDecoded[0]
|
||||||
|
|
||||||
|
if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) {
|
||||||
|
t.Fatal("bytes not equal")
|
||||||
|
}
|
||||||
|
if msgDecoded.magic != 1 {
|
||||||
|
t.Fatal("magic incorrect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultipleCompressedMessages(t *testing.T) {
|
||||||
|
msgs := []*Message{NewMessage([]byte("testing")),
|
||||||
|
NewMessage([]byte("multiple")),
|
||||||
|
NewMessage([]byte("messages")),
|
||||||
|
}
|
||||||
|
msg := NewCompressedMessages(msgs...)
|
||||||
|
|
||||||
|
length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
|
||||||
|
if length == 0 || msgsDecoded == nil {
|
||||||
|
t.Fatal("msgsDecoded is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure the decompressed messages match what was put in
|
||||||
|
for index, decodedMsg := range msgsDecoded {
|
||||||
|
if !bytes.Equal(msgs[index].payload, decodedMsg.payload) {
|
||||||
|
t.Fatalf("Payload doesn't match, expected: % X but was: % X\n",
|
||||||
|
msgs[index].payload, decodedMsg.payload)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,24 +239,22 @@ func TestRequestHeaderEncoding(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func TestPublishRequestEncoding(t *testing.T) {
|
func TestPublishRequestEncoding(t *testing.T) {
|
||||||
payload := []byte("testing")
|
payload := []byte("testing")
|
||||||
msg := NewMessage(payload)
|
msg := NewMessage(payload)
|
||||||
|
|
||||||
messages := list.New()
|
|
||||||
messages.PushBack(msg)
|
|
||||||
pubBroker := NewBrokerPublisher("localhost:9092", "test", 0)
|
pubBroker := NewBrokerPublisher("localhost:9092", "test", 0)
|
||||||
request := pubBroker.broker.EncodePublishRequest(messages)
|
request := pubBroker.broker.EncodePublishRequest(msg)
|
||||||
|
|
||||||
// generated by kafka-rb:
|
// generated by kafka-rb:
|
||||||
expected := []byte{0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
|
expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
|
||||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x0c,
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d,
|
||||||
0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
|
/* magic comp ...... chksum .... .. payload .. */
|
||||||
|
0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
|
||||||
|
|
||||||
if !bytes.Equal(expected, request) {
|
if !bytes.Equal(expected, request) {
|
||||||
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
|
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
|
||||||
t.Errorf("expected: %X\n but got: %X", expected, request)
|
t.Errorf("expected: % X\n but got: % X", expected, request)
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -122,7 +271,7 @@ func TestConsumeRequestEncoding(t *testing.T) {
|
||||||
|
|
||||||
if !bytes.Equal(expected, request) {
|
if !bytes.Equal(expected, request) {
|
||||||
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
|
t.Errorf("expected length: %d but got: %d", len(expected), len(request))
|
||||||
t.Errorf("expected: %X\n but got: %X", expected, request)
|
t.Errorf("expected: % X\n but got: % X", expected, request)
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ type BrokerConsumer struct {
|
||||||
broker *Broker
|
broker *Broker
|
||||||
offset uint64
|
offset uint64
|
||||||
maxSize uint32
|
maxSize uint32
|
||||||
|
codecs map[byte]PayloadCodec
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new broker consumer
|
// Create a new broker consumer
|
||||||
|
|
@ -45,7 +46,8 @@ type BrokerConsumer struct {
|
||||||
func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer {
|
func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer {
|
||||||
return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
|
return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
|
||||||
offset: offset,
|
offset: offset,
|
||||||
maxSize: maxSize}
|
maxSize: maxSize,
|
||||||
|
codecs: DefaultCodecsMap}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simplified consumer that defaults the offset and maxSize to 0.
|
// Simplified consumer that defaults the offset and maxSize to 0.
|
||||||
|
|
@ -55,9 +57,18 @@ func NewBrokerConsumer(hostname string, topic string, partition int, offset uint
|
||||||
func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer {
|
func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer {
|
||||||
return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
|
return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
|
||||||
offset: 0,
|
offset: 0,
|
||||||
maxSize: 0}
|
maxSize: 0,
|
||||||
|
codecs: DefaultCodecsMap}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add Custom Payload Codecs for Consumer Decoding
|
||||||
|
// payloadCodecs - an array of PayloadCodec implementations
|
||||||
|
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
|
||||||
|
// merge to the default map, so one 'could' override the default codecs..
|
||||||
|
for k, v := range codecsMap(payloadCodecs) {
|
||||||
|
consumer.codecs[k] = v, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
|
func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
|
||||||
conn, err := consumer.broker.connect()
|
conn, err := consumer.broker.connect()
|
||||||
|
|
@ -77,14 +88,15 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != os.EOF {
|
if err != os.EOF {
|
||||||
log.Println("Fatal Error: ", err)
|
log.Println("Fatal Error: ", err)
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
|
quit <- true // force quit
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
time.Sleep(pollTimeoutMs * 1000000)
|
time.Sleep(pollTimeoutMs * 1000000)
|
||||||
}
|
}
|
||||||
done <- true
|
done <- true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// wait to be told to stop..
|
// wait to be told to stop..
|
||||||
<-quit
|
<-quit
|
||||||
conn.Close()
|
conn.Close()
|
||||||
|
|
@ -111,7 +123,6 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
|
func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
|
||||||
_, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
|
_, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -129,14 +140,19 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
|
||||||
// parse out the messages
|
// parse out the messages
|
||||||
var currentOffset uint64 = 0
|
var currentOffset uint64 = 0
|
||||||
for currentOffset <= uint64(length-4) {
|
for currentOffset <= uint64(length-4) {
|
||||||
msg := Decode(payload[currentOffset:])
|
totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
|
||||||
if msg == nil {
|
if msgs == nil {
|
||||||
return num, os.NewError("Error Decoding Message")
|
return num, os.NewError("Error Decoding Message")
|
||||||
}
|
}
|
||||||
msg.offset = consumer.offset + currentOffset
|
msgOffset := consumer.offset + currentOffset
|
||||||
currentOffset += uint64(4 + msg.totalLength)
|
for _, msg := range msgs {
|
||||||
handlerFunc(msg)
|
// update all of the messages offset
|
||||||
num += 1
|
// multiple messages can be at the same offset (compressed for example)
|
||||||
|
msg.offset = msgOffset
|
||||||
|
handlerFunc(&msg)
|
||||||
|
num += 1
|
||||||
|
}
|
||||||
|
currentOffset += uint64(4 + totalLength)
|
||||||
}
|
}
|
||||||
// update the broker's offset for next consumption
|
// update the broker's offset for next consumption
|
||||||
consumer.offset += currentOffset
|
consumer.offset += currentOffset
|
||||||
|
|
@ -145,7 +161,6 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Get a list of valid offsets (up to maxNumOffsets) before the given time, where
|
// Get a list of valid offsets (up to maxNumOffsets) before the given time, where
|
||||||
// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
|
// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
|
||||||
// The result is a list of offsets, in descending order.
|
// The result is a list of offsets, in descending order.
|
||||||
|
|
|
||||||
|
|
@ -22,12 +22,10 @@
|
||||||
|
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
func uint16bytes(value int) []byte {
|
func uint16bytes(value int) []byte {
|
||||||
result := make([]byte, 2)
|
result := make([]byte, 2)
|
||||||
binary.BigEndian.PutUint16(result, uint16(value))
|
binary.BigEndian.PutUint16(result, uint16(value))
|
||||||
|
|
|
||||||
|
|
@ -28,17 +28,14 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"fmt"
|
"fmt"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"strconv"
|
|
||||||
"io"
|
"io"
|
||||||
"bufio"
|
"bufio"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MAGIC_DEFAULT = 0
|
NETWORK = "tcp"
|
||||||
NETWORK = "tcp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type Broker struct {
|
type Broker struct {
|
||||||
topic string
|
topic string
|
||||||
partition int
|
partition int
|
||||||
|
|
@ -51,7 +48,6 @@ func newBroker(hostname string, topic string, partition int) *Broker {
|
||||||
hostname: hostname}
|
hostname: hostname}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
|
func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
|
||||||
raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
|
raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -91,7 +87,9 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
|
||||||
|
|
||||||
errorCode := binary.BigEndian.Uint16(messages[0:2])
|
errorCode := binary.BigEndian.Uint16(messages[0:2])
|
||||||
if errorCode != 0 {
|
if errorCode != 0 {
|
||||||
return 0, []byte{}, os.NewError(strconv.Uitoa(uint(errorCode)))
|
log.Println("errorCode: ", errorCode)
|
||||||
|
return 0, []byte{}, os.NewError(
|
||||||
|
fmt.Sprintf("Broker Response Error: %d", errorCode))
|
||||||
}
|
}
|
||||||
return expectedLength, messages[2:], nil
|
return expectedLength, messages[2:], nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@
|
||||||
* of their respective owners.
|
* of their respective owners.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -30,13 +29,21 @@ import (
|
||||||
"log"
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression
|
||||||
|
MAGIC_DEFAULT = 1
|
||||||
|
// magic + compression + chksum
|
||||||
|
NO_LEN_HEADER_SIZE = 1 + 1 + 4
|
||||||
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
magic byte
|
magic byte
|
||||||
|
compression byte
|
||||||
checksum [4]byte
|
checksum [4]byte
|
||||||
payload []byte
|
payload []byte
|
||||||
offset uint64 // only used after decoding
|
offset uint64 // only used after decoding
|
||||||
totalLength uint32 // total length of the message (decoding)
|
totalLength uint32 // total length of the raw message (from decoding)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Offset() uint64 {
|
func (m *Message) Offset() uint64 {
|
||||||
|
|
@ -51,57 +58,125 @@ func (m *Message) PayloadString() string {
|
||||||
return string(m.payload)
|
return string(m.payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessage(payload []byte) *Message {
|
func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message {
|
||||||
message := &Message{}
|
message := &Message{}
|
||||||
message.magic = byte(MAGIC_DEFAULT)
|
message.magic = byte(MAGIC_DEFAULT)
|
||||||
binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload))
|
message.compression = codec.Id()
|
||||||
message.payload = payload
|
message.payload = codec.Encode(payload)
|
||||||
|
binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload))
|
||||||
return message
|
return message
|
||||||
}
|
}
|
||||||
|
|
||||||
// MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
|
// Default is is create a message with no compression
|
||||||
|
func NewMessage(payload []byte) *Message {
|
||||||
|
return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a Message using the default compression method (gzip)
|
||||||
|
func NewCompressedMessage(payload []byte) *Message {
|
||||||
|
return NewCompressedMessages(NewMessage(payload))
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCompressedMessages(messages ...*Message) *Message {
|
||||||
|
buf := bytes.NewBuffer([]byte{})
|
||||||
|
for _, message := range messages {
|
||||||
|
buf.Write(message.Encode())
|
||||||
|
}
|
||||||
|
return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID])
|
||||||
|
}
|
||||||
|
|
||||||
|
// MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
|
||||||
func (m *Message) Encode() []byte {
|
func (m *Message) Encode() []byte {
|
||||||
msgLen := 1 + 4 + len(m.payload)
|
msgLen := NO_LEN_HEADER_SIZE + len(m.payload)
|
||||||
msg := make([]byte, 4+msgLen)
|
msg := make([]byte, 4+msgLen)
|
||||||
binary.BigEndian.PutUint32(msg[0:], uint32(msgLen))
|
binary.BigEndian.PutUint32(msg[0:], uint32(msgLen))
|
||||||
msg[4] = m.magic
|
msg[4] = m.magic
|
||||||
copy(msg[5:], m.checksum[0:])
|
msg[5] = m.compression
|
||||||
copy(msg[9:], m.payload)
|
|
||||||
|
copy(msg[6:], m.checksum[0:])
|
||||||
|
copy(msg[10:], m.payload)
|
||||||
|
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func Decode(packet []byte) *Message {
|
func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) {
|
||||||
|
return Decode(packet, DefaultCodecsMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) {
|
||||||
|
messages := []Message{}
|
||||||
|
|
||||||
|
length, message := decodeMessage(packet, payloadCodecsMap)
|
||||||
|
|
||||||
|
if length > 0 && message != nil {
|
||||||
|
if message.compression != NO_COMPRESSION_ID {
|
||||||
|
// wonky special case for compressed messages having embedded messages
|
||||||
|
payloadLen := uint32(len(message.payload))
|
||||||
|
messageLenLeft := payloadLen
|
||||||
|
for messageLenLeft > 0 {
|
||||||
|
start := payloadLen - messageLenLeft
|
||||||
|
innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap)
|
||||||
|
messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32
|
||||||
|
messages = append(messages, *innerMsg)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
messages = append(messages, *message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return length, messages
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) {
|
||||||
length := binary.BigEndian.Uint32(packet[0:])
|
length := binary.BigEndian.Uint32(packet[0:])
|
||||||
if length > uint32(len(packet[4:])) {
|
if length > uint32(len(packet[4:])) {
|
||||||
log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:]))
|
log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:]))
|
||||||
return nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
msg := Message{}
|
msg := Message{}
|
||||||
msg.totalLength = length
|
msg.totalLength = length
|
||||||
msg.magic = packet[4]
|
msg.magic = packet[4]
|
||||||
copy(msg.checksum[:], packet[5:9])
|
|
||||||
payloadLength := length - 1 - 4
|
rawPayload := []byte{}
|
||||||
msg.payload = packet[9 : 9+payloadLength]
|
if msg.magic == 0 {
|
||||||
|
msg.compression = byte(0)
|
||||||
|
copy(msg.checksum[:], packet[5:9])
|
||||||
|
payloadLength := length - 1 - 4
|
||||||
|
rawPayload = packet[9 : 9+payloadLength]
|
||||||
|
} else if msg.magic == MAGIC_DEFAULT {
|
||||||
|
msg.compression = packet[5]
|
||||||
|
copy(msg.checksum[:], packet[6:10])
|
||||||
|
payloadLength := length - NO_LEN_HEADER_SIZE
|
||||||
|
rawPayload = packet[10 : 10+payloadLength]
|
||||||
|
} else {
|
||||||
|
log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
payloadChecksum := make([]byte, 4)
|
payloadChecksum := make([]byte, 4)
|
||||||
binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload))
|
binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload))
|
||||||
if !bytes.Equal(payloadChecksum, msg.checksum[:]) {
|
if !bytes.Equal(payloadChecksum, msg.checksum[:]) {
|
||||||
log.Printf("checksum mismatch, expected: %X was: %X\n", payloadChecksum, msg.checksum[:])
|
msg.Print()
|
||||||
return nil
|
log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:])
|
||||||
|
return 0, nil
|
||||||
}
|
}
|
||||||
return &msg
|
msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload)
|
||||||
|
|
||||||
|
return length, &msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (msg *Message) Print() {
|
func (msg *Message) Print() {
|
||||||
log.Println("----- Begin Message ------")
|
log.Println("----- Begin Message ------")
|
||||||
log.Printf("magic: %X\n", msg.magic)
|
log.Printf("magic: %X\n", msg.magic)
|
||||||
|
log.Printf("compression: %X\n", msg.compression)
|
||||||
log.Printf("checksum: %X\n", msg.checksum)
|
log.Printf("checksum: %X\n", msg.checksum)
|
||||||
if len(msg.payload) < 1048576 { // 1 MB
|
if len(msg.payload) < 1048576 { // 1 MB
|
||||||
log.Printf("payload: %X\n", msg.payload)
|
log.Printf("payload: % X\n", msg.payload)
|
||||||
log.Printf("payload(string): %s\n", msg.PayloadString())
|
log.Printf("payload(string): %s\n", msg.PayloadString())
|
||||||
} else {
|
} else {
|
||||||
log.Printf("long payload, length: %d\n", len(msg.payload))
|
log.Printf("long payload, length: %d\n", len(msg.payload))
|
||||||
}
|
}
|
||||||
|
log.Printf("length: %d\n", msg.totalLength)
|
||||||
log.Printf("offset: %d\n", msg.offset)
|
log.Printf("offset: %d\n", msg.offset)
|
||||||
log.Println("----- End Message ------")
|
log.Println("----- End Message ------")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,116 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2011 NeuStar, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
* NeuStar, the Neustar logo and related names and logos are registered
|
||||||
|
* trademarks, service marks or tradenames of NeuStar, Inc. All other
|
||||||
|
* product names, company names, marks, logos and symbols may be trademarks
|
||||||
|
* of their respective owners.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
// "log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
NO_COMPRESSION_ID = 0
|
||||||
|
GZIP_COMPRESSION_ID = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
type PayloadCodec interface {
|
||||||
|
|
||||||
|
// the 1 byte id of the codec
|
||||||
|
Id() byte
|
||||||
|
|
||||||
|
// encoder interface for compression implementation
|
||||||
|
Encode(data []byte) []byte
|
||||||
|
|
||||||
|
// decoder interface for decompression implementation
|
||||||
|
Decode(data []byte) []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default Codecs
|
||||||
|
|
||||||
|
var DefaultCodecs = []PayloadCodec{
|
||||||
|
new(NoCompressionPayloadCodec),
|
||||||
|
new(GzipPayloadCodec),
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultCodecsMap = codecsMap(DefaultCodecs)
|
||||||
|
|
||||||
|
func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
|
||||||
|
payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
|
||||||
|
for _, c := range payloadCodecs {
|
||||||
|
payloadCodecsMap[c.Id()] = c, true
|
||||||
|
}
|
||||||
|
return payloadCodecsMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// No compression codec, noop
|
||||||
|
|
||||||
|
type NoCompressionPayloadCodec struct {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *NoCompressionPayloadCodec) Id() byte {
|
||||||
|
return NO_COMPRESSION_ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gzip Codec
|
||||||
|
|
||||||
|
type GzipPayloadCodec struct {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *GzipPayloadCodec) Id() byte {
|
||||||
|
return GZIP_COMPRESSION_ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *GzipPayloadCodec) Encode(data []byte) []byte {
|
||||||
|
buf := bytes.NewBuffer([]byte{})
|
||||||
|
zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed)
|
||||||
|
zipper.Write(data)
|
||||||
|
zipper.Close()
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *GzipPayloadCodec) Decode(data []byte) []byte {
|
||||||
|
buf := bytes.NewBuffer([]byte{})
|
||||||
|
zipper, _ := gzip.NewReader(bytes.NewBuffer(data))
|
||||||
|
unzipped := make([]byte, 100)
|
||||||
|
for {
|
||||||
|
n, err := zipper.Read(unzipped)
|
||||||
|
if n > 0 && err == nil {
|
||||||
|
buf.Write(unzipped[0:n])
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
zipper.Close()
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
@ -23,11 +23,9 @@
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type BrokerPublisher struct {
|
type BrokerPublisher struct {
|
||||||
broker *Broker
|
broker *Broker
|
||||||
}
|
}
|
||||||
|
|
@ -36,21 +34,19 @@ func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPub
|
||||||
return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
|
return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
|
func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
|
||||||
messages := list.New()
|
return b.BatchPublish(message)
|
||||||
messages.PushBack(message)
|
|
||||||
return b.BatchPublish(messages)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BrokerPublisher) BatchPublish(messages *list.List) (int, os.Error) {
|
func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
|
||||||
conn, err := b.broker.connect()
|
conn, err := b.broker.connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
// TODO: MULTIPRODUCE
|
// TODO: MULTIPRODUCE
|
||||||
num, err := conn.Write(b.broker.EncodePublishRequest(messages))
|
request := b.broker.EncodePublishRequest(messages...)
|
||||||
|
num, err := conn.Write(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,22 +25,19 @@ package kafka
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"bytes"
|
"bytes"
|
||||||
"container/list"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type RequestType uint16
|
type RequestType uint16
|
||||||
|
|
||||||
// Request Types
|
// Request Types
|
||||||
const (
|
const (
|
||||||
REQUEST_PRODUCE RequestType = 0
|
REQUEST_PRODUCE RequestType = 0
|
||||||
REQUEST_FETCH = 1
|
REQUEST_FETCH = 1
|
||||||
REQUEST_MULTIFETCH = 2
|
REQUEST_MULTIFETCH = 2
|
||||||
REQUEST_MULTIPRODUCE = 3
|
REQUEST_MULTIPRODUCE = 3
|
||||||
REQUEST_OFFSETS = 4
|
REQUEST_OFFSETS = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
// Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
|
// Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
|
||||||
func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer {
|
func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer {
|
||||||
request := bytes.NewBuffer([]byte{})
|
request := bytes.NewBuffer([]byte{})
|
||||||
|
|
@ -70,7 +67,6 @@ func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte {
|
||||||
return request.Bytes()
|
return request.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// <Request Header><OFFSET: uint64><MAX SIZE: uint32>
|
// <Request Header><OFFSET: uint64><MAX SIZE: uint32>
|
||||||
func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte {
|
func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte {
|
||||||
request := b.EncodeRequestHeader(REQUEST_FETCH)
|
request := b.EncodeRequestHeader(REQUEST_FETCH)
|
||||||
|
|
@ -83,9 +79,8 @@ func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte {
|
||||||
return request.Bytes()
|
return request.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
|
// <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
|
||||||
func (b *Broker) EncodePublishRequest(messages *list.List) []byte {
|
func (b *Broker) EncodePublishRequest(messages ...*Message) []byte {
|
||||||
// 4 + 2 + 2 + topicLength + 4 + 4
|
// 4 + 2 + 2 + topicLength + 4 + 4
|
||||||
request := b.EncodeRequestHeader(REQUEST_PRODUCE)
|
request := b.EncodeRequestHeader(REQUEST_PRODUCE)
|
||||||
|
|
||||||
|
|
@ -93,8 +88,7 @@ func (b *Broker) EncodePublishRequest(messages *list.List) []byte {
|
||||||
request.Write(uint32bytes(0)) // placeholder message len
|
request.Write(uint32bytes(0)) // placeholder message len
|
||||||
|
|
||||||
written := 0
|
written := 0
|
||||||
for element := messages.Front(); element != nil; element = element.Next() {
|
for _, message := range messages {
|
||||||
message := element.Value.(*Message)
|
|
||||||
wrote, _ := request.Write(message.Encode())
|
wrote, _ := request.Write(message.Encode())
|
||||||
written += wrote
|
written += wrote
|
||||||
}
|
}
|
||||||
|
|
@ -103,6 +97,5 @@ func (b *Broker) EncodePublishRequest(messages *list.List) []byte {
|
||||||
binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written))
|
binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written))
|
||||||
// now add the size of the whole to the first uint32
|
// now add the size of the whole to the first uint32
|
||||||
encodeRequestSize(request)
|
encodeRequestSize(request)
|
||||||
|
|
||||||
return request.Bytes()
|
return request.Bytes()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@
|
||||||
* of their respective owners.
|
* of their respective owners.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@
|
||||||
* of their respective owners.
|
* of their respective owners.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -53,7 +52,6 @@ func init() {
|
||||||
flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
|
flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
fmt.Println("Consuming Messages :")
|
fmt.Println("Consuming Messages :")
|
||||||
|
|
@ -87,7 +85,7 @@ func main() {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
sig := <-signal.Incoming
|
sig := <-signal.Incoming
|
||||||
if sig.(signal.UnixSignal) == syscall.SIGINT {
|
if sig.(os.UnixSignal) == syscall.SIGINT {
|
||||||
quit <- true
|
quit <- true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ var topic string
|
||||||
var partition int
|
var partition int
|
||||||
var message string
|
var message string
|
||||||
var messageFile string
|
var messageFile string
|
||||||
|
var compress bool
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
|
flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
|
||||||
|
|
@ -41,6 +42,7 @@ func init() {
|
||||||
flag.IntVar(&partition, "partition", 0, "partition to publish to")
|
flag.IntVar(&partition, "partition", 0, "partition to publish to")
|
||||||
flag.StringVar(&message, "message", "", "message to publish")
|
flag.StringVar(&message, "message", "", "message to publish")
|
||||||
flag.StringVar(&messageFile, "messagefile", "", "read message from this file")
|
flag.StringVar(&messageFile, "messagefile", "", "read message from this file")
|
||||||
|
flag.BoolVar(&compress, "compress", false, "compress the messages published")
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
@ -64,12 +66,24 @@ func main() {
|
||||||
payload := make([]byte, stat.Size)
|
payload := make([]byte, stat.Size)
|
||||||
file.Read(payload)
|
file.Read(payload)
|
||||||
timing := kafka.StartTiming("Sending")
|
timing := kafka.StartTiming("Sending")
|
||||||
broker.Publish(kafka.NewMessage(payload))
|
|
||||||
|
if compress {
|
||||||
|
broker.Publish(kafka.NewCompressedMessage(payload))
|
||||||
|
} else {
|
||||||
|
broker.Publish(kafka.NewMessage(payload))
|
||||||
|
}
|
||||||
|
|
||||||
timing.Print()
|
timing.Print()
|
||||||
file.Close()
|
file.Close()
|
||||||
} else {
|
} else {
|
||||||
timing := kafka.StartTiming("Sending")
|
timing := kafka.StartTiming("Sending")
|
||||||
broker.Publish(kafka.NewMessage([]byte(message)))
|
|
||||||
|
if compress {
|
||||||
|
broker.Publish(kafka.NewCompressedMessage([]byte(message)))
|
||||||
|
} else {
|
||||||
|
broker.Publish(kafka.NewMessage([]byte(message)))
|
||||||
|
}
|
||||||
|
|
||||||
timing.Print()
|
timing.Print()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue