diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 6f6c51eddea..d577cf7ab8e 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -875,10 +875,11 @@ public class TopologyTestDriverTest { } private void flushStore() { - final KeyValueIterator it = store.all(); - while (it.hasNext()) { - final KeyValue next = it.next(); - context.forward(next.key, next.value); + try (final KeyValueIterator it = store.all()) { + while (it.hasNext()) { + final KeyValue next = it.next(); + context.forward(next.key, next.value); + } } } @@ -942,21 +943,20 @@ public class TopologyTestDriverTest { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); - { - final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config); + try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a")); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L)); Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a")); - testDriver.close(); } - { - final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { Assert.assertNull( "Closing the prior test driver should have cleaned up this store and value.", testDriver.getKeyValueStore("storeProcessorStore").get("a") ); } + } @Test