mirror of https://github.com/apache/kafka.git
MINOR: Fix KStreamKTableJoinTest and StreamTaskTest (#9357)
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
a15387f34d
commit
95986a8f48
|
@ -208,6 +208,9 @@
|
|||
<suppress checks="MethodLength"
|
||||
files="KStreamSlidingWindowAggregateTest.java"/>
|
||||
|
||||
<suppress checks="ClassFanOutComplexity"
|
||||
files="StreamTaskTest.java"/>
|
||||
|
||||
<!-- Streams test-utils -->
|
||||
<suppress checks="ClassFanOutComplexity"
|
||||
files="TopologyTestDriver.java"/>
|
||||
|
|
|
@ -255,7 +255,7 @@ public class KStreamKTableJoinTest {
|
|||
}
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[0] "
|
||||
hasItem("Skipping record due to null join key or value. key=[null] value=[A] topic=[streamTopic] partition=[0] "
|
||||
+ "offset=[0]"));
|
||||
}
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ public class KStreamKTableJoinTest {
|
|||
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[0] "
|
||||
hasItem("Skipping record due to null join key or value. key=[1] value=[null] topic=[streamTopic] partition=[0] "
|
||||
+ "offset=[0]")
|
||||
);
|
||||
}
|
||||
|
|
|
@ -2050,22 +2050,22 @@ public class StreamTaskTest {
|
|||
final ProcessorTopology topology = withSources(asList(), mkMap());
|
||||
|
||||
final TopologyException exception = assertThrows(
|
||||
TopologyException.class,
|
||||
() -> new StreamTask(
|
||||
taskId,
|
||||
partitions,
|
||||
topology,
|
||||
consumer,
|
||||
createConfig(false, "100"),
|
||||
metrics,
|
||||
stateDirectory,
|
||||
cache,
|
||||
time,
|
||||
stateManager,
|
||||
recordCollector,
|
||||
context
|
||||
)
|
||||
);
|
||||
TopologyException.class,
|
||||
() -> new StreamTask(
|
||||
taskId,
|
||||
partitions,
|
||||
topology,
|
||||
consumer,
|
||||
createConfig(false, "100"),
|
||||
metrics,
|
||||
stateDirectory,
|
||||
cache,
|
||||
time,
|
||||
stateManager,
|
||||
recordCollector,
|
||||
context
|
||||
)
|
||||
);
|
||||
|
||||
assertThat(exception.getMessage(), equalTo("Invalid topology: " +
|
||||
"Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same " +
|
||||
|
|
Loading…
Reference in New Issue