MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)

Changes the tag name from topic-name to just topic to conform to the way this tag is named elsewhere (ie in the clients)
Also:
    - fixes a comment about dynamic topic routing
    - fixes some indentation in MockRecordCollector
    - Undoes the changes to KStreamSplitTest.scala and TestTopicsTest which are no longer necessary after this hotfix

Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
A. Sophie Blee-Goldman 2022-06-21 04:10:36 -07:00 committed by GitHub
parent b7b7615db3
commit 0928666987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 84 deletions

View File

@ -219,12 +219,13 @@ public class RecordCollectorImpl implements RecordCollector {
} }
if (!topic.endsWith("-changelog")) { if (!topic.endsWith("-changelog")) {
// we may not have created a sensor yet if the node uses dynamic topic routing
final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId); final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
if (producedSensorByTopic == null) { if (producedSensorByTopic == null) {
log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n" log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
+ "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet()); + "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
} else { } else {
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for that topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent( final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic, topic,
t -> TopicMetrics.producedSensor( t -> TopicMetrics.producedSensor(

View File

@ -117,7 +117,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_ID_TAG = "thread-id"; public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id"; public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
public static final String TOPIC_NAME_TAG = "topic-name"; public static final String TOPIC_NAME_TAG = "topic";
public static final String STORE_ID_TAG = "state-id"; public static final String STORE_ID_TAG = "state-id";
public static final String RECORD_CACHE_ID_TAG = "record-cache-id"; public static final String RECORD_CACHE_ID_TAG = "record-cache-id";

View File

@ -39,7 +39,7 @@ public class TopicMetricsTest {
private static final String THREAD_ID = "test-thread"; private static final String THREAD_ID = "test-thread";
private static final String TASK_ID = "test-task"; private static final String TASK_ID = "test-task";
private static final String PROCESSOR_NODE_ID = "test-processor"; private static final String PROCESSOR_NODE_ID = "test-processor";
private static final String TOPIC_NAME = "topic"; private static final String TOPIC = "topic";
private final Map<String, String> tagMap = Collections.singletonMap("hello", "world"); private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
@ -59,14 +59,14 @@ public class TopicMetricsTest {
final String descriptionOfRecordsTotal = "The total number of records consumed from this topic"; final String descriptionOfRecordsTotal = "The total number of records consumed from this topic";
final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic"; final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic";
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO)) when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor); .thenReturn(expectedSensor);
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO)) when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor); .thenReturn(expectedSensor);
when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap); when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
verifySensor( verifySensor(
() -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics) () -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics)
); );
STREAMS_METRICS_STATIC_MOCK.verify( STREAMS_METRICS_STATIC_MOCK.verify(
@ -89,13 +89,13 @@ public class TopicMetricsTest {
final String descriptionOfRecordsTotal = "The total number of records produced to this topic"; final String descriptionOfRecordsTotal = "The total number of records produced to this topic";
final String descriptionOfBytesTotal = "The total number of bytes produced to this topic"; final String descriptionOfBytesTotal = "The total number of bytes produced to this topic";
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO)) when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor); .thenReturn(expectedSensor);
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO)) when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor); .thenReturn(expectedSensor);
when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap); when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics)); verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics));
STREAMS_METRICS_STATIC_MOCK.verify( STREAMS_METRICS_STATIC_MOCK.verify(
() -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor( () -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(

View File

@ -50,12 +50,14 @@ public class MockRecordCollector implements RecordCollector {
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final String processorNodeId, final String processorNodeId,
final InternalProcessorContext<Void, Void> context) { final InternalProcessorContext<Void, Void> context) {
collected.add(new ProducerRecord<>(topic, collected.add(new ProducerRecord<>(
topic,
partition, partition,
timestamp, timestamp,
key, key,
value, value,
headers)); headers)
);
} }
@Override @Override
@ -69,12 +71,14 @@ public class MockRecordCollector implements RecordCollector {
final String processorNodeId, final String processorNodeId,
final InternalProcessorContext<Void, Void> context, final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner) { final StreamPartitioner<? super K, ? super V> partitioner) {
collected.add(new ProducerRecord<>(topic, collected.add(new ProducerRecord<>(
topic,
0, // partition id 0, // partition id
timestamp, timestamp,
key, key,
value, value,
headers)); headers)
);
} }
@Override @Override

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.scala.kstream package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.Named import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.serialization.Serdes._
@ -36,7 +35,7 @@ class KStreamSplitTest extends TestDriver {
val sinkTopic = Array("default", "even", "three"); val sinkTopic = Array("default", "even", "three");
val m = builder val m = builder
.stream[Int, Int](sourceTopic) .stream[Integer, Integer](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0) .branch((_, v) => v % 2 == 0)
.branch((_, v) => v % 3 == 0) .branch((_, v) => v % 3 == 0)
@ -47,17 +46,14 @@ class KStreamSplitTest extends TestDriver {
m("_2").to(sinkTopic(2)) m("_2").to(sinkTopic(2))
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Int, Int](sourceTopic) val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
val testOutput = sinkTopic.map(name => testDriver.createOutput[Int, Int](name)) val testOutput = sinkTopic.map(name => testDriver.createOutput[Integer, Integer](name))
testInput pipeKeyValueList List(
new KeyValue(1, 1),
new KeyValue(1, 2),
new KeyValue(1, 3),
new KeyValue(1, 4),
new KeyValue(1, 5)
).asJava
testInput.pipeValueList(
List(1, 2, 3, 4, 5)
.map(Integer.valueOf)
.asJava
)
assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala) assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala)
assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala) assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala)
assertEquals(List(3), testOutput(2).readValuesToList().asScala) assertEquals(List(3), testOutput(2).readValuesToList().asScala)
@ -71,7 +67,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source" val sourceTopic = "source"
val m = builder val m = builder
.stream[Int, Int](sourceTopic) .stream[Integer, Integer](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens")) .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens"))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped")) .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped"))
@ -80,18 +76,15 @@ class KStreamSplitTest extends TestDriver {
m("_mapped").to("mapped") m("_mapped").to("mapped")
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Int, Int](sourceTopic) val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
testInput pipeKeyValueList List( testInput.pipeValueList(
new KeyValue(1, 1), List(1, 2, 3, 4, 5, 9)
new KeyValue(1, 2), .map(Integer.valueOf)
new KeyValue(1, 3), .asJava
new KeyValue(1, 4), )
new KeyValue(1, 5),
new KeyValue(1, 9)
).asJava
val even = testDriver.createOutput[Int, Int]("even") val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Int, Int]("mapped") val mapped = testDriver.createOutput[Integer, Integer]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala) assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala) assertEquals(List(9, 81), mapped.readValuesToList().asScala)
@ -105,7 +98,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source" val sourceTopic = "source"
val m = builder val m = builder
.stream[Int, Int](sourceTopic) .stream[Integer, Integer](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"))) .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even")))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x))) .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x)))
@ -114,23 +107,19 @@ class KStreamSplitTest extends TestDriver {
m("_2").to("mapped") m("_2").to("mapped")
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Int, Int](sourceTopic) val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
testInput pipeKeyValueList List( testInput.pipeValueList(
new KeyValue(1, 1), List(1, 2, 3, 4, 5, 9)
new KeyValue(1, 2), .map(Integer.valueOf)
new KeyValue(1, 3), .asJava
new KeyValue(1, 4), )
new KeyValue(1, 5),
new KeyValue(1, 9)
).asJava
val even = testDriver.createOutput[Int, Int]("even") val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Int, Int]("mapped") val mapped = testDriver.createOutput[Integer, Integer]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala) assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala) assertEquals(List(9, 81), mapped.readValuesToList().asScala)
testDriver.close() testDriver.close()
} }
} }

View File

@ -41,7 +41,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -86,12 +85,12 @@ public class TestTopicsTest {
@Test @Test
public void testValue() { public void testValue() {
final TestInputTopic<Long, String> inputTopic = final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic, timestamp and key irrelevant in this case //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeInput(1L, "Hello"); inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello")); assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic //No more output in topic
assertThat(outputTopic.isEmpty(), is(true)); assertThat(outputTopic.isEmpty(), is(true));
@ -99,20 +98,16 @@ public class TestTopicsTest {
@Test @Test
public void testValueList() { public void testValueList() {
final TestInputTopic<Long, String> inputTopic = final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
final List<KeyValue<Long, String>> inputList = Arrays.asList( final List<String> inputList = Arrays.asList("This", "is", "an", "example");
new KeyValue<>(1L, "This"), //Feed list of words to inputTopic and no kafka key, timestamp is irrelevant in this case
new KeyValue<>(2L, "is"), inputTopic.pipeValueList(inputList);
new KeyValue<>(3L, "an"),
new KeyValue<>(4L, "example"));
//Feed list of words to inputTopic, key and timestamp are irrelevant in this case
inputTopic.pipeKeyValueList(inputList);
final List<String> output = outputTopic.readValuesToList(); final List<String> output = outputTopic.readValuesToList();
assertThat(output, hasItems("This", "is", "an", "example")); assertThat(output, hasItems("This", "is", "an", "example"));
assertThat(output, is(equalTo(inputList.stream().map(kv -> kv.value).collect(Collectors.toList())))); assertThat(output, is(equalTo(inputList)));
} }
@Test @Test
@ -229,8 +224,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic = final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello", baseTime); inputTopic.pipeInput(null, "Hello", baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", null, baseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", null, baseTime))));
inputTopic.pipeInput(2L, "Kafka", ++baseTime); inputTopic.pipeInput(2L, "Kafka", ++baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime))));
@ -238,15 +233,13 @@ public class TestTopicsTest {
inputTopic.pipeInput(2L, "Kafka", testBaseTime); inputTopic.pipeInput(2L, "Kafka", testBaseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime))));
final List<KeyValue<Long, String>> inputList = Arrays.asList( final List<String> inputList = Arrays.asList("Advancing", "time");
new KeyValue<>(1L, "Advancing"),
new KeyValue<>(2L, "time"));
//Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant //Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant
final Duration advance = Duration.ofSeconds(15); final Duration advance = Duration.ofSeconds(15);
final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1)); final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1));
inputTopic.pipeKeyValueList(inputList, recordInstant, advance); inputTopic.pipeValueList(inputList, recordInstant, advance);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Advancing", recordInstant)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Advancing", recordInstant))));
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "time", null, recordInstant.plus(advance))))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "time", null, recordInstant.plus(advance)))));
} }
@Test @Test
@ -294,8 +287,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance);
final TestOutputTopic<Long, String> outputTopic = final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello"); inputTopic.pipeInput("Hello");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "Kafka"); inputTopic.pipeInput(2L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance))))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance)))));
} }
@ -339,12 +332,12 @@ public class TestTopicsTest {
@Test @Test
public void testEmptyTopic() { public void testEmptyTopic() {
final TestInputTopic<Long, String> inputTopic = final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeInput(1L, "Hello"); inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello")); assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic //No more output in topic
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic"); assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic");