diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 15961e7c721..b131db59973 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -106,15 +106,15 @@ public class MeteredWindowStore extends WrappedStateStore.AbstractStateSto @Override public V fetch(final K key, final long timestamp) { final long startNs = time.nanoseconds(); - V ret; try { final byte[] result = inner.fetch(keyBytes(key), timestamp); - ret = serdes.valueFrom(result); + if (result == null) { + return null; + } + return serdes.valueFrom(result); } finally { metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds()); } - - return ret; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 58c345a7867..732f3d62a42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -110,6 +110,9 @@ public class RocksDBWindowStore extends WrappedStateStore.AbstractStateSto @Override public V fetch(final K key, final long timestamp) { final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes)); + if (bytesValue == null) { + return null; + } return serdes.valueFrom(bytesValue); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 3ff343a7155..0e3b4e8223e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -81,6 +81,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @After public void after() { + super.after(); context.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index bda385ebcb9..16ef47c1c1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -76,7 +75,7 @@ public class CachingSessionStoreTest { Segments.segmentInterval(retention, numSegments) ); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); + context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); cachingStore.init(context, cachingStore); } @@ -87,10 +86,6 @@ public class CachingSessionStoreTest { cachingStore.close(); } - private Bytes bytesKey(final String key) { - return Bytes.wrap(key.getBytes()); - } - @Test public void shouldPutFetchFromCache() { cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index c8aec6babfb..98224e9c611 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -69,9 +69,8 @@ public class ChangeLoggingSessionBytesStoreTest { private final Windowed key1 = new Windowed<>(bytesKey, new SessionWindow(0, 0)); @Before - public void setUp() throws Exception { + public void setUp() { store = new ChangeLoggingSessionBytesStore(inner); - } private void init() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 5a4f952a78d..59c7ade8f97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -40,13 +40,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class MeteredWindowStoreTest { private MockProcessorContext context; @SuppressWarnings("unchecked") private final WindowStore innerStoreMock = EasyMock.createNiceMock(WindowStore.class); - private final MeteredWindowStore store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), Serdes.String()); + private final MeteredWindowStore store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull()); private final Set latencyRecorded = new HashSet<>(); private final Set throughputRecorded = new HashSet<>(); @@ -118,7 +119,7 @@ public class MeteredWindowStoreTest { } @Test - public void shouldRecordRestoreLatencyOnInit() throws Exception { + public void shouldRecordRestoreLatencyOnInit() { innerStoreMock.init(context, store); EasyMock.expectLastCall(); EasyMock.replay(innerStoreMock); @@ -127,7 +128,7 @@ public class MeteredWindowStoreTest { } @Test - public void shouldRecordPutLatency() throws Exception { + public void shouldRecordPutLatency() { final byte[] bytes = "a".getBytes(); innerStoreMock.put(EasyMock.eq(Bytes.wrap(bytes)), EasyMock.anyObject(), EasyMock.eq(context.timestamp())); EasyMock.expectLastCall(); @@ -140,7 +141,7 @@ public class MeteredWindowStoreTest { } @Test - public void shouldRecordFetchLatency() throws Exception { + public void shouldRecordFetchLatency() { EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator()); EasyMock.replay(innerStoreMock); @@ -151,7 +152,7 @@ public class MeteredWindowStoreTest { } @Test - public void shouldRecordFetchRangeLatency() throws Exception { + public void shouldRecordFetchRangeLatency() { EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators., byte[]>emptyIterator()); EasyMock.replay(innerStoreMock); @@ -163,7 +164,7 @@ public class MeteredWindowStoreTest { @Test - public void shouldRecordFlushLatency() throws Exception { + public void shouldRecordFlushLatency() { innerStoreMock.flush(); EasyMock.expectLastCall(); EasyMock.replay(innerStoreMock); @@ -176,7 +177,7 @@ public class MeteredWindowStoreTest { @Test - public void shouldCloseUnderlyingStore() throws Exception { + public void shouldCloseUnderlyingStore() { innerStoreMock.close(); EasyMock.expectLastCall(); EasyMock.replay(innerStoreMock); @@ -187,4 +188,13 @@ public class MeteredWindowStoreTest { } + @Test + public void shouldNotExceptionIfFetchReturnsNull() { + EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); + EasyMock.replay(innerStoreMock); + + store.init(context, store); + assertNull(store.fetch("a", 0)); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index ec2e3383aca..bdf0379ddb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -51,7 +51,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { } public static class TheRocksDbConfigSetter implements RocksDBConfigSetter { - static boolean called = false; @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 408733656e2..a89dc60e665 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -79,6 +79,7 @@ public class RocksDBStoreTest { @After public void tearDown() { rocksDBStore.close(); + context.close(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index f7572987f39..a7a978a8d75 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -795,6 +795,19 @@ public class RocksDBWindowStoreTest { windowStore.fetch(1, null, 1L, 2L); } + @Test + public void shouldNoNullPointerWhenSerdeDoesntHandleNull() { + windowStore = new RocksDBWindowStore<>( + new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()), + Serdes.Integer(), + new SerdeThatDoesntHandleNull(), + false, + windowSize); + windowStore.init(context, windowStore); + + assertNull(windowStore.fetch(1, 0)); + } + @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore windowStore = Stores.windowStoreBuilder( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java new file mode 100644 index 00000000000..bf1d030b9e0 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Map; + +class SerdeThatDoesntHandleNull implements Serde { + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public void close() { + + } + + @Override + public Serializer serializer() { + return new StringSerializer(); + } + + @Override + public Deserializer deserializer() { + return new StringDeserializer() { + @Override + public String deserialize(final String topic, final byte[] data) { + if (data == null) { + throw new NullPointerException(); + } + return super.deserialize(topic, data); + } + }; + } +}