mirror of https://github.com/apache/kafka.git
KAFKA-13588 We should consolidate `changelogFor` methods to simplify the generation of internal topic names (#17125)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6354f49645
commit
64b909b598
|
@ -48,17 +48,6 @@ public final class ProcessorContextUtils {
|
||||||
return (StreamsMetricsImpl) context.metrics();
|
return (StreamsMetricsImpl) context.metrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String changelogFor(final ProcessorContext context, final String storeName, final Boolean newChangelogTopic) {
|
|
||||||
final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId());
|
|
||||||
if (context instanceof InternalProcessorContext && !newChangelogTopic) {
|
|
||||||
final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName);
|
|
||||||
if (changelogTopic != null)
|
|
||||||
return changelogTopic;
|
|
||||||
|
|
||||||
}
|
|
||||||
return ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) {
|
public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) {
|
||||||
final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId());
|
final String prefix = topicNamePrefix(context.appConfigs(), context.applicationId());
|
||||||
if (context instanceof InternalProcessorContext && !newChangelogTopic) {
|
if (context instanceof InternalProcessorContext && !newChangelogTopic) {
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||||
import org.apache.kafka.streams.kstream.internals.Change;
|
import org.apache.kafka.streams.kstream.internals.Change;
|
||||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
||||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
|
||||||
import org.apache.kafka.streams.processor.StateStore;
|
import org.apache.kafka.streams.processor.StateStore;
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
import org.apache.kafka.streams.processor.TaskId;
|
import org.apache.kafka.streams.processor.TaskId;
|
||||||
|
@ -163,15 +162,6 @@ public class MeteredKeyValueStore<K, V>
|
||||||
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
|
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
protected void initStoreSerde(final ProcessorContext context) {
|
|
||||||
final String storeName = name();
|
|
||||||
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
|
|
||||||
serdes = StoreSerdeInitializer.prepareStoreSerde(
|
|
||||||
context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void initStoreSerde(final StateStoreContext context) {
|
protected void initStoreSerde(final StateStoreContext context) {
|
||||||
final String storeName = name();
|
final String storeName = name();
|
||||||
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
|
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
|
||||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
|
||||||
import org.apache.kafka.streams.processor.StateStore;
|
import org.apache.kafka.streams.processor.StateStore;
|
||||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||||
|
@ -294,18 +293,6 @@ public class MeteredVersionedKeyValueStore<K, V>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
protected void initStoreSerde(final ProcessorContext context) {
|
|
||||||
super.initStoreSerde(context);
|
|
||||||
|
|
||||||
// additionally init raw value serde
|
|
||||||
final String storeName = super.name();
|
|
||||||
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
|
|
||||||
plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(
|
|
||||||
context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initStoreSerde(final StateStoreContext context) {
|
protected void initStoreSerde(final StateStoreContext context) {
|
||||||
super.initStoreSerde(context);
|
super.initStoreSerde(context);
|
||||||
|
|
Loading…
Reference in New Issue