KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied (#10613)

When users supply in-memory stores for left/outer joins, then the internal shared outer store must be switch to in-memory store too. This will allow users who want to keep all stores in memory to continue doing so.

Added unit tests to validate topology and left/outer joins work fine with an in-memory shared store.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Sergio Peña 2021-05-05 12:21:43 -05:00 committed by GitHub
parent a1367f57f5
commit d915ce58d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 276 additions and 31 deletions

View File

@ -144,18 +144,7 @@ class KStreamImplJoin {
Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter) {
final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
// Get the suffix index of the joinThisGeneratedName to build the outer join store name.
final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
+ joinThisGeneratedName.substring(
rightOuter
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(windows, streamJoinedInternal, joinThisGeneratedName));
}
// Time shared between joins to keep track of the maximum stream time
@ -263,20 +252,57 @@ class KStreamImplJoin {
return builder;
}
private <K, V1, V2> String buildOuterJoinWindowStoreName(final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, final String joinThisGeneratedName) {
final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
} else if (streamJoinedInternal.storeName() != null) {
return streamJoinedInternal.storeName() + outerJoinSuffix;
} else {
return KStreamImpl.OUTERSHARED_NAME
+ joinThisGeneratedName.substring(
rightOuter
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
}
}
@SuppressWarnings("unchecked")
private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName,
final JoinWindows windows,
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
persistentTimeOrderedWindowStore(
storeName + "-store",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size())
),
new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
Time.SYSTEM
);
private <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows,
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
final String joinThisGeneratedName) {
final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent();
final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName);
final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder;
if (persistent) {
builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
persistentTimeOrderedWindowStore(
storeName + "-store",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size())
),
keyAndJoinSideSerde,
leftOrRightValueSerde,
Time.SYSTEM
);
} else {
builder = Stores.windowStoreBuilder(
Stores.inMemoryWindowStore(
storeName + "-store",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
false
),
keyAndJoinSideSerde,
leftOrRightValueSerde
);
}
if (streamJoinedInternal.loggingEnabled()) {
builder.withLoggingEnabled(streamJoinedInternal.logConfig());
} else {

View File

@ -36,6 +36,8 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
@ -45,6 +47,7 @@ import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -778,6 +781,60 @@ public class TopologyTest {
describe.toString());
}
@Test
public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
stream1 = builder.stream("input-topic1");
stream2 = builder.stream("input-topic2");
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
joinWindows,
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
.withThisStoreSupplier(thisStoreSupplier)
.withOtherStoreSupplier(otherStoreSupplier));
final TopologyDescription describe = builder.build().describe();
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
" --> KSTREAM-WINDOWED-0000000002\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
" --> KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
" --> KSTREAM-JOINTHIS-0000000004\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
" --> KSTREAM-JOINOTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KSTREAM-JOINOTHER-0000000005 (stores: [in-memory-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
" --> none\n" +
" <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n",
describe.toString());
}
@Test
public void streamStreamLeftJoinTopologyWithDefaultStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
@ -863,6 +920,60 @@ public class TopologyTest {
describe.toString());
}
@Test
public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
stream1 = builder.stream("input-topic1");
stream2 = builder.stream("input-topic2");
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
joinWindows,
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
.withThisStoreSupplier(thisStoreSupplier)
.withOtherStoreSupplier(otherStoreSupplier));
final TopologyDescription describe = builder.build().describe();
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
" --> KSTREAM-WINDOWED-0000000002\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
" --> KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
" --> KSTREAM-JOINTHIS-0000000004\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
" --> none\n" +
" <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
describe.toString());
}
@Test
public void streamStreamOuterJoinTopologyWithDefaultStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
@ -948,6 +1059,60 @@ public class TopologyTest {
describe.toString());
}
@Test
public void streamStreamOuterJoinTopologyWithCustomStoresSuppliers() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
stream1 = builder.stream("input-topic1");
stream2 = builder.stream("input-topic2");
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
joinWindows,
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
.withThisStoreSupplier(thisStoreSupplier)
.withOtherStoreSupplier(otherStoreSupplier));
final TopologyDescription describe = builder.build().describe();
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
" --> KSTREAM-WINDOWED-0000000002\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
" --> KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
" --> KSTREAM-OUTERTHIS-0000000004\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
" --> KSTREAM-OUTEROTHER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000003\n" +
" Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n" +
" --> KSTREAM-MERGE-0000000006\n" +
" <-- KSTREAM-WINDOWED-0000000002\n" +
" Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
" --> none\n" +
" <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n",
describe.toString());
}
@Test
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -429,7 +431,32 @@ public class KStreamKStreamLeftJoinTest {
}
@Test
public void testLeftJoin() {
public void testLeftJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runLeftJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
}
@Test
public void testLeftJoinWithDefaultSuppliers() {
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runLeftJoin(streamJoined, joinWindows);
}
public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined,
final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};
@ -444,8 +471,8 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joinWindows,
streamJoined);
joined.process(supplier);
final Collection<Set<String>> copartitionGroups =

View File

@ -30,6 +30,8 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -498,7 +500,32 @@ public class KStreamKStreamOuterJoinTest {
}
@Test
public void testOuterJoin() {
public void testOuterJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()), true);
final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runOuterJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
}
@Test
public void testOuterJoinWithDefaultSuppliers() {
final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());
runOuterJoin(streamJoined, joinWindows);
}
public void runOuterJoin(final StreamJoined<Integer, String, String> streamJoined,
final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};
@ -513,8 +540,8 @@ public class KStreamKStreamOuterJoinTest {
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joinWindows,
streamJoined);
joined.process(supplier);
final Collection<Set<String>> copartitionGroups =