KAFKA-3500: Handle null keys and values in KafkaOffsetBackingStore.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma, Jason Gustafson, Gwen Shapira

Closes #1662 from ewencp/kafka-3500-kafka-offset-backing-store-null

(cherry picked from commit 4059f07216)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>
This commit is contained in:
Ewen Cheslack-Postava 2016-07-26 19:43:32 -07:00 committed by Gwen Shapira
parent dc93f3bf49
commit 817f237683
2 changed files with 76 additions and 2 deletions

View File

@ -118,8 +118,11 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) { public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback); SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback); ByteBuffer key = entry.getKey();
ByteBuffer value = entry.getValue();
offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(), producerCallback);
}
return producerCallback; return producerCallback;
} }

View File

@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -270,6 +271,76 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test
public void testGetSetNull() throws Exception {
expectConfigure();
expectStart(Collections.EMPTY_LIST);
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
PowerMock.expectLastCall();
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
PowerMock.expectLastCall();
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array()));
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
}
});
expectStop();
PowerMock.replayAll();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
// Set offsets using null keys and values
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
toSet.put(null, TP0_VALUE);
toSet.put(TP1_KEY, null);
final AtomicBoolean invoked = new AtomicBoolean(false);
Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
invoked.set(true);
}
});
assertFalse(setFuture.isDone());
// Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
// for the store's set callback
callback1.getValue().onCompletion(null, null);
assertFalse(invoked.get());
callback0.getValue().onCompletion(null, null);
setFuture.get(10000, TimeUnit.MILLISECONDS);
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
@Override
public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
assertEquals(TP0_VALUE, result.get(null));
assertNull(result.get(TP1_KEY));
secondGetInvokedAndPassed.set(true);
}
}).get(10000, TimeUnit.MILLISECONDS);
assertTrue(secondGetInvokedAndPassed.get());
store.stop();
PowerMock.verifyAll();
}
@Test @Test
public void testSetFailure() throws Exception { public void testSetFailure() throws Exception {
expectConfigure(); expectConfigure();