KAFKA-13507: GlobalProcessor ignores user specified names (#11573)

Use the name specified via consumed parameter in InternalStreamsBuilder#addGlobalStore method for initializing the source name and processor name. If not specified, the names are generated.

Reviewers: Luke Chen <showuon@gmail.com>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Tamara Skokova 2021-12-09 18:42:00 +04:00 committed by GitHub
parent f6360d1dc1
commit 133b515b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 2 deletions

View File

@ -206,8 +206,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
// explicitly disable logging for global stores
storeBuilder.withLoggingDisabled();
final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
final NamedInternal named = new NamedInternal(consumed.name());
final String sourceName = named.suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, KStreamImpl.SOURCE_NAME);
final String processorName = named.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
final GraphNode globalStoreNode = new GlobalStoreNode<>(
storeBuilder,

View File

@ -49,6 +49,7 @@ import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
@ -998,6 +999,40 @@ public class StreamsBuilderTest {
STREAM_OPERATION_NAME);
}
@Test
public void shouldUseSpecifiedNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
"topic",
Consumed.with(Serdes.String(), Serdes.String()).withName("test"),
new MockApiProcessorSupplier<>()
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildGlobalStateTopology();
assertNamesForOperation(topology, "test-source", "test");
}
@Test
public void shouldUseDefaultNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
"topic",
Consumed.with(Serdes.String(), Serdes.String()),
new MockApiProcessorSupplier<>()
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildGlobalStateTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KTABLE-SOURCE-0000000001");
}
@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");