mirror of https://github.com/apache/kafka.git
KAFKA-10847: Remove internal config for enabling the fix (#10941)
Also update the upgrade guide indicating about the grace period KIP and its indication on the fix with throughput impact. Reviewers: Luke Chen <showuon@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
parent
921a3428a8
commit
3e3264760b
|
@ -134,6 +134,25 @@
|
|||
<p>
|
||||
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
|
||||
</p>
|
||||
<p>
|
||||
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins.
|
||||
This period determines how long after a window ends any out-of-order records will still be processed.
|
||||
Records coming in after the grace period has elapsed will be dropped from those windows.
|
||||
With a long grace period, though Kafka Streams can handle out-of-order data up to that amount of time, it will also incur a high and confusing latency for users,
|
||||
e.g. suppression operators with the default won't emit results up for 24 hours, while in practice out-of-order data usually has a much smaller time-skewness.
|
||||
Instead of abstracting this config from users with a long default value, we introduced new constructs such as <code>TimeWindows#ofSizeAndGrace</code> to let callers always set it upon constructing the windows;
|
||||
the other setters such as <code>TimeWindows#grace</code> are deprecated and will be removed in the future.
|
||||
</p>
|
||||
<p>
|
||||
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
|
||||
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
|
||||
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
|
||||
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
|
||||
</p>
|
||||
<ul>
|
||||
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>: Drop 24 hour default of grace period in Streams</li>
|
||||
<li><a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a>: Avoid spurious left/outer join results in stream-stream join</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
|
||||
|
|
|
@ -944,9 +944,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
// This is settable in the main Streams config, but it's a private API for testing
|
||||
public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
|
||||
|
||||
// Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847)
|
||||
public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__";
|
||||
|
||||
// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
|
||||
public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
|
|||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
|
||||
|
@ -29,13 +28,13 @@ import org.apache.kafka.streams.state.WindowStore;
|
|||
import org.apache.kafka.streams.state.WindowStoreIterator;
|
||||
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
|
||||
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
|
||||
|
||||
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
|
||||
|
@ -96,12 +95,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements org.apache.kafka.streams.proce
|
|||
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
|
||||
otherWindowStore = context.getStateStore(otherWindowName);
|
||||
|
||||
if (enableSpuriousResultFix
|
||||
&& StreamsConfig.InternalConfig.getBoolean(
|
||||
context.appConfigs(),
|
||||
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
|
||||
true
|
||||
)) {
|
||||
if (enableSpuriousResultFix) {
|
||||
outerJoinWindowStore = outerJoinWindowName.map(context::getStateStore);
|
||||
sharedTimeTracker.nextTimeToEmit = context.currentSystemTimeMs();
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.streams.kstream.internals.graph;
|
||||
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.Joined;
|
||||
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
|
||||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
||||
|
@ -26,12 +25,9 @@ import org.apache.kafka.streams.state.WindowStore;
|
|||
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
|
||||
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
|
||||
|
||||
/**
|
||||
* Too much information to generalize, so Stream-Stream joins are represented by a specific node.
|
||||
*/
|
||||
|
@ -102,14 +98,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
|
|||
topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
|
||||
topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
|
||||
|
||||
if (enableSpuriousResultFix &&
|
||||
(props == null ||
|
||||
StreamsConfig.InternalConfig.getBoolean(
|
||||
new HashMap(props),
|
||||
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
|
||||
true
|
||||
)
|
||||
)) {
|
||||
if (enableSpuriousResultFix) {
|
||||
outerJoinWindowStoreBuilder.ifPresent(builder -> topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
|
|||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||
import org.apache.kafka.streams.Topology.AutoOffsetReset;
|
||||
import org.apache.kafka.streams.errors.TopologyException;
|
||||
import org.apache.kafka.streams.kstream.Branched;
|
||||
|
@ -707,54 +706,35 @@ public class StreamsBuilderTest {
|
|||
STREAM_OPERATION_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabledViaConfig() {
|
||||
shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabled(
|
||||
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabledViaOldApi() {
|
||||
shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabled(
|
||||
JoinWindows.of(Duration.ofHours(1)),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
private void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabled(
|
||||
final JoinWindows joinWindows,
|
||||
final boolean enableFix
|
||||
) {
|
||||
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
|
||||
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
|
||||
|
||||
streamOne.leftJoin(
|
||||
streamTwo,
|
||||
(value1, value2) -> value1,
|
||||
joinWindows,
|
||||
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
|
||||
.withName(STREAM_OPERATION_NAME)
|
||||
streamTwo,
|
||||
(value1, value2) -> value1,
|
||||
JoinWindows.of(Duration.ofHours(1)),
|
||||
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
|
||||
.withName(STREAM_OPERATION_NAME)
|
||||
);
|
||||
|
||||
final Properties properties = new Properties();
|
||||
properties.put(InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, enableFix);
|
||||
builder.build(properties);
|
||||
|
||||
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
|
||||
assertNamesForStateStore(topology.stateStores(),
|
||||
STREAM_OPERATION_NAME + "-this-join-store",
|
||||
STREAM_OPERATION_NAME + "-outer-other-join-store"
|
||||
STREAM_OPERATION_NAME + "-this-join-store",
|
||||
STREAM_OPERATION_NAME + "-outer-other-join-store"
|
||||
);
|
||||
assertNamesForOperation(topology,
|
||||
"KSTREAM-SOURCE-0000000000",
|
||||
"KSTREAM-SOURCE-0000000001",
|
||||
STREAM_OPERATION_NAME + "-this-windowed",
|
||||
STREAM_OPERATION_NAME + "-other-windowed",
|
||||
STREAM_OPERATION_NAME + "-this-join",
|
||||
STREAM_OPERATION_NAME + "-outer-other-join",
|
||||
STREAM_OPERATION_NAME + "-merge");
|
||||
"KSTREAM-SOURCE-0000000000",
|
||||
"KSTREAM-SOURCE-0000000001",
|
||||
STREAM_OPERATION_NAME + "-this-windowed",
|
||||
STREAM_OPERATION_NAME + "-other-windowed",
|
||||
STREAM_OPERATION_NAME + "-this-join",
|
||||
STREAM_OPERATION_NAME + "-outer-other-join",
|
||||
STREAM_OPERATION_NAME + "-merge");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
|
|||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.TopologyWrapper;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
|
@ -29,7 +28,6 @@ import org.apache.kafka.streams.kstream.JoinWindows;
|
|||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.TestInputTopic;
|
||||
import org.apache.kafka.streams.kstream.StreamJoined;
|
||||
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
|
||||
import org.apache.kafka.test.MockProcessor;
|
||||
|
@ -46,11 +44,7 @@ import java.util.HashSet;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.time.Duration.ofHours;
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
||||
|
@ -63,52 +57,8 @@ public class KStreamKStreamLeftJoinTest {
|
|||
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
|
||||
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
|
||||
|
||||
@Test
|
||||
public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final KStream<Integer, String> stream1;
|
||||
final KStream<Integer, String> stream2;
|
||||
final KStream<Integer, String> joined;
|
||||
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
|
||||
stream1 = builder.stream(topic1, consumed);
|
||||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.leftJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
props.put(ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 5);
|
||||
|
||||
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class);
|
||||
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
|
||||
assertThat(appender.getMessages(), hasItem("Invalid value (5) on internal configuration " +
|
||||
"'" + ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX + "'. Please specify a true/false value."));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftJoinWithSpuriousResultFixDisabledViaFeatureFlag() {
|
||||
runLeftJoinWithoutSpuriousResultFix(
|
||||
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
|
||||
runLeftJoinWithoutSpuriousResultFix(
|
||||
JoinWindows.of(ofMillis(100L)),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
private void runLeftJoinWithoutSpuriousResultFix(final JoinWindows joinWindows,
|
||||
final boolean fixEnabled) {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final int[] expectedKeys = new int[] {0, 1, 2, 3};
|
||||
|
@ -121,20 +71,18 @@ public class KStreamKStreamLeftJoinTest {
|
|||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.leftJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
joinWindows,
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.of(ofMillis(100L)),
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
props.put(ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, fixEnabled);
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
|
||||
final TestInputTopic<Integer, String> inputTopic1 =
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final TestInputTopic<Integer, String> inputTopic2 =
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
|
||||
|
||||
// Only 2 window stores should be available
|
||||
|
@ -149,8 +97,8 @@ public class KStreamKStreamLeftJoinTest {
|
|||
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(1, "A1+null", 0L)
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(1, "A1+null", 0L)
|
||||
);
|
||||
|
||||
// push two items to the other stream; this should produce two items
|
||||
|
@ -162,29 +110,14 @@ public class KStreamKStreamLeftJoinTest {
|
|||
inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "A1+a1", 0L)
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "A1+a1", 0L)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledFeatureFlag() {
|
||||
testLeftJoinDuplicatesSpuriousResultFix(
|
||||
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
|
||||
false
|
||||
);
|
||||
}
|
||||
@Test
|
||||
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
|
||||
testLeftJoinDuplicatesSpuriousResultFix(
|
||||
JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
private void testLeftJoinDuplicatesSpuriousResultFix(final JoinWindows joinWindows,
|
||||
final boolean fixEnabled) {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final KStream<Integer, String> stream1;
|
||||
|
@ -195,20 +128,18 @@ public class KStreamKStreamLeftJoinTest {
|
|||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.leftJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
joinWindows,
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
props.put(ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, fixEnabled);
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
|
||||
final TestInputTopic<Integer, String> inputTopic1 =
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final TestInputTopic<Integer, String> inputTopic2 =
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
|
||||
|
||||
// Only 2 window stores should be available
|
||||
|
@ -219,10 +150,10 @@ public class KStreamKStreamLeftJoinTest {
|
|||
inputTopic2.pipeInput(0, "a0", 0L);
|
||||
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L)
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
|
|||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TestInputTopic;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.TopologyWrapper;
|
||||
|
@ -29,7 +28,6 @@ import org.apache.kafka.streams.kstream.Consumed;
|
|||
import org.apache.kafka.streams.kstream.JoinWindows;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.StreamJoined;
|
||||
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
|
||||
import org.apache.kafka.test.MockProcessor;
|
||||
|
@ -47,9 +45,6 @@ import java.util.Properties;
|
|||
import java.util.Set;
|
||||
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
||||
|
@ -60,52 +55,8 @@ public class KStreamKStreamOuterJoinTest {
|
|||
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
|
||||
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
|
||||
|
||||
@Test
|
||||
public void testOuterJoinWithInvalidSpuriousResultFixFlag() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final KStream<Integer, String> stream1;
|
||||
final KStream<Integer, String> stream2;
|
||||
final KStream<Integer, String> joined;
|
||||
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
|
||||
stream1 = builder.stream(topic1, consumed);
|
||||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.outerJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
props.put(ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 5);
|
||||
|
||||
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class);
|
||||
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
|
||||
assertThat(appender.getMessages(), hasItem("Invalid value (5) on internal configuration " +
|
||||
"'" + ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX + "'. Please specify a true/false value."));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOuterJoinDuplicatesWithFixDisabledFeatureFlag() {
|
||||
testOuterJoinDuplicatesWithoutFix(
|
||||
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
|
||||
testOuterJoinDuplicatesWithoutFix(
|
||||
JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
private void testOuterJoinDuplicatesWithoutFix(final JoinWindows joinWindows,
|
||||
final boolean fixEnabled) {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final KStream<Integer, String> stream1;
|
||||
|
@ -116,20 +67,18 @@ public class KStreamKStreamOuterJoinTest {
|
|||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.outerJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
joinWindows,
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),
|
||||
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
props.put(ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, fixEnabled);
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(props), props)) {
|
||||
final TestInputTopic<Integer, String> inputTopic1 =
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final TestInputTopic<Integer, String> inputTopic2 =
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
|
||||
|
||||
// Only 2 window stores should be available
|
||||
|
@ -141,11 +90,11 @@ public class KStreamKStreamOuterJoinTest {
|
|||
inputTopic2.pipeInput(1, "b1", 0L);
|
||||
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "null+b1", 0L)
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "null+b1", 0L)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue