diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index e8984fb8943..92199869793 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -118,8 +118,11 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { public Future set(final Map values, final Callback callback) { SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback); - for (Map.Entry entry : values.entrySet()) - offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback); + for (Map.Entry entry : values.entrySet()) { + ByteBuffer key = entry.getKey(); + ByteBuffer value = entry.getValue(); + offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(), producerCallback); + } return producerCallback; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 38e0f7b1b9b..4a244f0a6b6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -270,6 +271,76 @@ public class KafkaOffsetBackingStoreTest { PowerMock.verifyAll(); } + @Test + public void testGetSetNull() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST); + + // Set offsets + Capture callback0 = EasyMock.newCapture(); + storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); + PowerMock.expectLastCall(); + Capture callback1 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1)); + PowerMock.expectLastCall(); + + // Second get() should get the produced data and return the new values + final Capture> secondGetReadToEndCallback = EasyMock.newCapture(); + storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null)); + secondGetReadToEndCallback.getValue().onCompletion(null, null); + return null; + } + }); + + expectStop(); + + PowerMock.replayAll(); + + store.configure(DEFAULT_DISTRIBUTED_CONFIG); + store.start(); + + // Set offsets using null keys and values + Map toSet = new HashMap<>(); + toSet.put(null, TP0_VALUE); + toSet.put(TP1_KEY, null); + final AtomicBoolean invoked = new AtomicBoolean(false); + Future setFuture = store.set(toSet, new Callback() { + @Override + public void onCompletion(Throwable error, Void result) { + invoked.set(true); + } + }); + assertFalse(setFuture.isDone()); + // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback + // for the store's set callback + callback1.getValue().onCompletion(null, null); + assertFalse(invoked.get()); + callback0.getValue().onCompletion(null, null); + setFuture.get(10000, TimeUnit.MILLISECONDS); + assertTrue(invoked.get()); + + // Getting data should read to end of our published data and return it + final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false); + store.get(Arrays.asList(null, TP1_KEY), new Callback>() { + @Override + public void onCompletion(Throwable error, Map result) { + assertEquals(TP0_VALUE, result.get(null)); + assertNull(result.get(TP1_KEY)); + secondGetInvokedAndPassed.set(true); + } + }).get(10000, TimeUnit.MILLISECONDS); + assertTrue(secondGetInvokedAndPassed.get()); + + store.stop(); + + PowerMock.verifyAll(); + } + @Test public void testSetFailure() throws Exception { expectConfigure();