mirror of https://github.com/apache/kafka.git
MINOR: fixed missing close of Iterator, used try-with-resource where appropriate (#6562)
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
3b1524c5df
commit
5919e73e5d
|
@ -875,10 +875,11 @@ public class TopologyTestDriverTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushStore() {
|
private void flushStore() {
|
||||||
final KeyValueIterator<String, Long> it = store.all();
|
try (final KeyValueIterator<String, Long> it = store.all()) {
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
final KeyValue<String, Long> next = it.next();
|
final KeyValue<String, Long> next = it.next();
|
||||||
context.forward(next.key, next.value);
|
context.forward(next.key, next.value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -942,21 +943,20 @@ public class TopologyTestDriverTest {
|
||||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
|
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
|
||||||
|
|
||||||
{
|
try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
|
||||||
final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
|
|
||||||
Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
|
Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
|
||||||
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
|
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
|
||||||
Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
|
Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
|
||||||
testDriver.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
|
try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
|
||||||
Assert.assertNull(
|
Assert.assertNull(
|
||||||
"Closing the prior test driver should have cleaned up this store and value.",
|
"Closing the prior test driver should have cleaned up this store and value.",
|
||||||
testDriver.getKeyValueStore("storeProcessorStore").get("a")
|
testDriver.getKeyValueStore("storeProcessorStore").get("a")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue