MINOR: prefix topics if internal config is set (#11611)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Walker Carlson 2022-01-10 18:08:48 -06:00 committed by GitHub
parent 1785e1223e
commit c182a431d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 129 additions and 25 deletions

View File

@ -951,6 +951,8 @@ public class StreamsConfig extends AbstractConfig {
public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"
+ ".vector.enabled__";
// Private API used to control the prefix of the auto created topics
public static final String TOPIC_PREFIX_ALTERNATIVE = "__internal.override.topic.prefix__";
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
@ -975,6 +977,16 @@ public class StreamsConfig extends AbstractConfig {
return defaultValue;
}
}
public static String getString(final Map<String, Object> configs, final String key, final String defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof String) {
return (String) value;
} else {
log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a String value.");
return defaultValue;
}
}
}
/**

View File

@ -1057,7 +1057,7 @@ public class InternalTopologyBuilder {
// remember the changelog topic if this state store is change-logging enabled
if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
final String changelogTopic =
ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName, topologyName);
ProcessorStateManager.storeChangelogTopic(getPrefix(), stateStoreName, topologyName);
storeToChangelogTopic.put(stateStoreName, changelogTopic);
changelogTopicToStore.put(changelogTopic, stateStoreName);
}
@ -1341,12 +1341,24 @@ public class InternalTopologyBuilder {
+ "setApplicationId first");
}
if (hasNamedTopology()) {
return applicationId + "-" + topologyName + "-" + topic;
return getPrefix() + "-" + topologyName + "-" + topic;
} else {
return applicationId + "-" + topic;
return getPrefix() + "-" + topic;
}
}
String getPrefix() {
if (topologyConfigs == null) {
return applicationId;
}
return StreamsConfig.InternalConfig.getString(
topologyConfigs.applicationConfigs.originals(),
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
applicationId
);
}
void initializeSubscription() {
if (usesPatternSubscription()) {
log.debug("Found pattern subscribed source topics, initializing consumer's subscription pattern.");

View File

@ -16,10 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
/**
* This class bridges the gap for components that _should_ be compatible with
* the public ProcessorContext interface, but have come to depend on features
@ -54,15 +57,29 @@ public final class ProcessorContextUtils {
}
public static String changelogFor(final ProcessorContext context, final String storeName) {
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, context.taskId().topologyName());
: ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
}
public static String changelogFor(final StateStoreContext context, final String storeName) {
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, context.taskId().topologyName());
: ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
}
private static String getPrefix(final Map<String, Object> configs, final String applicationId) {
if (configs == null) {
return applicationId;
} else {
return StreamsConfig.InternalConfig.getString(
configs,
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
applicationId
);
}
}
public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {

View File

@ -162,11 +162,11 @@ public class ProcessorStateManager implements StateManager {
private TaskType taskType;
public static String storeChangelogTopic(final String applicationId, final String storeName, final String namedTopology) {
public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) {
if (namedTopology == null) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
return prefix + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
} else {
return applicationId + "-" + namedTopology + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
return prefix + "-" + namedTopology + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -85,8 +86,13 @@ class CachingWindowStore
}
private void initInternal(final InternalProcessorContext<?, ?> context) {
final String prefix = StreamsConfig.InternalConfig.getString(
context.appConfigs(),
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
context.applicationId()
);
this.context = context;
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name(), context.taskId().topologyName());
final String topic = ProcessorStateManager.storeChangelogTopic(prefix, name(), context.taskId().topologyName());
bytesSerdes = new StateSerdes<>(
topic,

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -228,7 +229,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
);
context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, context.taskId().topologyName());
final String prefix = StreamsConfig.InternalConfig.getString(
context.appConfigs(),
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
context.applicationId()
);
changelogTopic = ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
updateBufferMetrics();
open = true;
partition = context.taskId().partition();

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
@ -171,10 +172,11 @@ public class MeteredKeyValueStore<K, V>
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
);
@ -183,15 +185,28 @@ public class MeteredKeyValueStore<K, V>
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
);
}
private static String getPrefix(final Map<String, Object> configs, final String applicationId) {
if (configs == null) {
return applicationId;
} else {
return StreamsConfig.InternalConfig.getString(
configs,
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
applicationId
);
}
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener,

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
@ -136,10 +137,11 @@ public class MeteredSessionStore<K, V>
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)),
WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context))
);
@ -148,15 +150,28 @@ public class MeteredSessionStore<K, V>
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = getPrefix(context.appConfigs(), context.applicationId());
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)),
WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context))
);
}
private static String getPrefix(final Map<String, Object> configs, final String applicationId) {
if (configs == null) {
return applicationId;
} else {
return StreamsConfig.InternalConfig.getString(
configs,
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
applicationId
);
}
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
@ -155,10 +156,15 @@ public class MeteredWindowStore<K, V>
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = StreamsConfig.InternalConfig.getString(
context.appConfigs(),
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
context.applicationId()
);
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}
@ -166,10 +172,15 @@ public class MeteredWindowStore<K, V>
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
final String prefix = StreamsConfig.InternalConfig.getString(
context.appConfigs(),
StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
context.applicationId()
);
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}

View File

@ -151,6 +151,7 @@ public class NamedTopologyIntegrationTest {
asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching
private final static List<KeyValue<String, Long>> SUM_OUTPUT_DATA =
asList(pair("B", 200L), pair("A", 400L), pair("C", 350L)); // output of summation with caching
private final static String TOPIC_PREFIX = "unique_topic_prefix";
private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
@ -182,15 +183,16 @@ public class NamedTopologyIntegrationTest {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, TOPIC_PREFIX);
return streamsConfiguration;
}
@Before
public void setup() throws Exception {
appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testName);
changelog1 = appId + "-" + TOPOLOGY_1 + "-store-changelog";
changelog2 = appId + "-" + TOPOLOGY_2 + "-store-changelog";
changelog3 = appId + "-" + TOPOLOGY_3 + "-store-changelog";
changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog";
changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog";
changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog";
props = configProps(appId);
streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier);
@ -223,8 +225,9 @@ public class NamedTopologyIntegrationTest {
streams2.close(Duration.ofSeconds(30));
}
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog")).forEach(t -> {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
assertThat("topic was not decorated", t.contains(TOPIC_PREFIX));
CLUSTER.deleteTopicsAndWait(t);
} catch (final InterruptedException e) {
e.printStackTrace();
@ -265,11 +268,11 @@ public class NamedTopologyIntegrationTest {
streams.start(asList(fkjBuilder.build(), countBuilder.build()));
waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(60));
final String countTopicPrefix = appId + "-" + countTopologyName;
final String fkjTopicPrefix = appId + "-" + fkjTopologyName;
final String countTopicPrefix = TOPIC_PREFIX + "-" + countTopologyName;
final String fkjTopicPrefix = TOPIC_PREFIX + "-" + fkjTopologyName;
final Set<String> internalTopics = CLUSTER
.getAllTopicsInCluster().stream()
.filter(t -> t.contains(appId))
.filter(t -> t.contains(TOPIC_PREFIX))
.filter(t -> t.endsWith("-repartition") || t.endsWith("-changelog") || t.endsWith("-topic"))
.collect(Collectors.toSet());
assertThat(internalTopics, is(mkSet(
@ -296,8 +299,8 @@ public class NamedTopologyIntegrationTest {
assertThat(results, equalTo(COUNT_OUTPUT_DATA));
final Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
assertThat(allTopics.contains(appId + "-" + "topology-1" + "-store-changelog"), is(true));
assertThat(allTopics.contains(appId + "-" + "topology-1" + "-store-repartition"), is(true));
assertThat(allTopics.contains(TOPIC_PREFIX + "-" + "topology-1" + "-store-changelog"), is(true));
assertThat(allTopics.contains(TOPIC_PREFIX + "-" + "topology-1" + "-store-repartition"), is(true));
}
@Test

View File

@ -45,6 +45,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
@ -58,6 +60,7 @@ import static org.junit.Assert.assertTrue;
public class GlobalStateStoreProviderTest {
private final Map<String, StateStore> stores = new HashMap<>();
private final static Map<String, Object> CONFIGS = mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, "appId"));
@Before
public void before() {
@ -110,6 +113,7 @@ public class GlobalStateStoreProviderTest {
);
expect(mockContext.taskId()).andStubReturn(new TaskId(0, 0));
expect(mockContext.recordCollector()).andStubReturn(null);
expect(mockContext.appConfigs()).andStubReturn(CONFIGS);
expectSerdes(mockContext);
replay(mockContext);
for (final StateStore store : stores.values()) {

View File

@ -99,6 +99,8 @@ public class MeteredTimestampedKeyValueStoreTest {
@Mock(type = MockType.NICE)
private InternalProcessorContext context;
private final static Map<String, Object> CONFIGS = mkMap(mkEntry(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, APPLICATION_ID));
private MeteredTimestampedKeyValueStore<String, String> metered;
private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair(KEY_BYTES,
VALUE_AND_TIMESTAMP_BYTES
@ -124,6 +126,7 @@ public class MeteredTimestampedKeyValueStoreTest {
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
expectSerdes();
expect(inner.name()).andStubReturn(STORE_NAME);
expect(context.appConfigs()).andStubReturn(CONFIGS);
tags = mkMap(
mkEntry(THREAD_ID_TAG_KEY, threadId),
mkEntry("task-id", taskId.toString()),