From 64b909b598d8f36306601a4838af1cb699207c83 Mon Sep 17 00:00:00 2001 From: xijiu <422766572@qq.com> Date: Mon, 9 Sep 2024 01:48:24 +0800 Subject: [PATCH] KAFKA-13588 We should consolidate `changelogFor` methods to simplify the generation of internal topic names (#17125) Reviewers: Chia-Ping Tsai --- .../processor/internals/ProcessorContextUtils.java | 11 ----------- .../state/internals/MeteredKeyValueStore.java | 10 ---------- .../internals/MeteredVersionedKeyValueStore.java | 13 ------------- 3 files changed, 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index d1789c12129..ae404b5c7ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -48,17 +48,6 @@ public final class ProcessorContextUtils { return (StreamsMetricsImpl) context.metrics(); } - public static String changelogFor(final ProcessorContext context, final String storeName, final Boolean newChangelogTopic) { - final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId()); - if (context instanceof InternalProcessorContext && !newChangelogTopic) { - final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName); - if (changelogTopic != null) - return changelogTopic; - - } - return ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName()); - } - public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) { final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId()); if (context instanceof InternalProcessorContext && !newChangelogTopic) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 6816910758e..e8e613acdbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; @@ -163,15 +162,6 @@ public class MeteredKeyValueStore return WrappingNullableUtils.prepareValueSerde(valueSerde, getter); } - - @Deprecated - protected void initStoreSerde(final ProcessorContext context) { - final String storeName = name(); - final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde( - context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); - } - protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index acdc3796646..de9efa7eb6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; @@ -294,18 +293,6 @@ public class MeteredVersionedKeyValueStore } } - @Deprecated - @Override - protected void initStoreSerde(final ProcessorContext context) { - super.initStoreSerde(context); - - // additionally init raw value serde - final String storeName = super.name(); - final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( - context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); - } - @Override protected void initStoreSerde(final StateStoreContext context) { super.initStoreSerde(context);