diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index d1789c12129..ae404b5c7ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -48,17 +48,6 @@ public final class ProcessorContextUtils { return (StreamsMetricsImpl) context.metrics(); } - public static String changelogFor(final ProcessorContext context, final String storeName, final Boolean newChangelogTopic) { - final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId()); - if (context instanceof InternalProcessorContext && !newChangelogTopic) { - final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName); - if (changelogTopic != null) - return changelogTopic; - - } - return ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName()); - } - public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) { final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId()); if (context instanceof InternalProcessorContext && !newChangelogTopic) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 6816910758e..e8e613acdbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; @@ -163,15 +162,6 @@ public class MeteredKeyValueStore return WrappingNullableUtils.prepareValueSerde(valueSerde, getter); } - - @Deprecated - protected void initStoreSerde(final ProcessorContext context) { - final String storeName = name(); - final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde( - context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); - } - protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index acdc3796646..de9efa7eb6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; @@ -294,18 +293,6 @@ public class MeteredVersionedKeyValueStore } } - @Deprecated - @Override - protected void initStoreSerde(final ProcessorContext context) { - super.initStoreSerde(context); - - // additionally init raw value serde - final String storeName = super.name(); - final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( - context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); - } - @Override protected void initStoreSerde(final StateStoreContext context) { super.initStoreSerde(context);