MINOR: remove get prefix for internal Kafka Streams methods (#16722)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-08-21 14:27:14 -07:00 committed by GitHub
parent 9d81a67009
commit e4cc5d18f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 140 additions and 141 deletions

View File

@ -49,7 +49,7 @@ public final class WordCountDemo {
public static final String INPUT_TOPIC = "streams-plaintext-input"; public static final String INPUT_TOPIC = "streams-plaintext-input";
public static final String OUTPUT_TOPIC = "streams-wordcount-output"; public static final String OUTPUT_TOPIC = "streams-wordcount-output";
static Properties getStreamsConfig(final String[] args) throws IOException { static Properties streamsConfig(final String[] args) throws IOException {
final Properties props = new Properties(); final Properties props = new Properties();
if (args != null && args.length > 0) { if (args != null && args.length > 0) {
try (final FileInputStream fis = new FileInputStream(args[0])) { try (final FileInputStream fis = new FileInputStream(args[0])) {
@ -85,7 +85,7 @@ public final class WordCountDemo {
} }
public static void main(final String[] args) throws IOException { public static void main(final String[] args) throws IOException {
final Properties props = getStreamsConfig(args); final Properties props = streamsConfig(args);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
createWordCountStream(builder); createWordCountStream(builder);

View File

@ -58,7 +58,7 @@ public class WordCountDemoTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
//Create Actual Stream Processing pipeline //Create Actual Stream Processing pipeline
WordCountDemo.createWordCountStream(builder); WordCountDemo.createWordCountStream(builder);
testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig(null)); testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.streamsConfig(null));
inputTopic = testDriver.createInputTopic(WordCountDemo.INPUT_TOPIC, new StringSerializer(), new StringSerializer()); inputTopic = testDriver.createInputTopic(WordCountDemo.INPUT_TOPIC, new StringSerializer(), new StringSerializer());
outputTopic = testDriver.createOutputTopic(WordCountDemo.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer()); outputTopic = testDriver.createOutputTopic(WordCountDemo.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer());
} }
@ -111,13 +111,13 @@ public class WordCountDemoTest {
} }
@Test @Test
public void testGetStreamsConfig() throws IOException { public void testStreamsConfig() throws IOException {
final File tmp = TestUtils.tempFile("bootstrap.servers=localhost:1234"); final File tmp = TestUtils.tempFile("bootstrap.servers=localhost:1234");
try { try {
Properties config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath()}); Properties config = WordCountDemo.streamsConfig(new String[] {tmp.getPath()});
assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))); assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)));
config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath(), "extra", "args"}); config = WordCountDemo.streamsConfig(new String[] {tmp.getPath(), "extra", "args"});
assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))); assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)));
} finally { } finally {
Files.deleteIfExists(tmp.toPath()); Files.deleteIfExists(tmp.toPath());

View File

@ -112,7 +112,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
@ -546,7 +546,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError(); closeToError();
break; break;
case SHUTDOWN_APPLICATION: case SHUTDOWN_APPLICATION:
if (getNumLiveStreamThreads() == 1) { if (numLiveStreamThreads() == 1) {
log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread"); log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
addStreamThread(); addStreamThread();
} }
@ -555,7 +555,7 @@ public class KafkaStreams implements AutoCloseable {
"but the uncaught exception was an Error, which means this runtime is no " + "but the uncaught exception was an Error, which means this runtime is no " +
"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
} }
if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) { if (Thread.currentThread().equals(globalStreamThread) && numLiveStreamThreads() == 0) {
log.error("Exception in global thread caused the application to attempt to shutdown." + log.error("Exception in global thread caused the application to attempt to shutdown." +
" This action will succeed only if there is at least one StreamThread running on this client." + " This action will succeed only if there is at least one StreamThread running on this client." +
" Currently there are no running threads so will now close the client."); " Currently there are no running threads so will now close the client.");
@ -991,12 +991,12 @@ public class KafkaStreams implements AutoCloseable {
// use client id instead of thread client id since this admin client may be shared among threads // use client id instead of thread client id since this admin client may be shared among threads
this.clientSupplier = clientSupplier; this.clientSupplier = clientSupplier;
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
log.info("Kafka Streams version: {}", ClientMetrics.version()); log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
metrics = getMetrics(applicationConfigs, time, clientId); metrics = createMetrics(applicationConfigs, time, clientId);
streamsMetrics = new StreamsMetricsImpl( streamsMetrics = new StreamsMetricsImpl(
metrics, metrics,
clientId, clientId,
@ -1010,7 +1010,7 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
threads = Collections.synchronizedList(new LinkedList<>()); threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads()); ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());
streamsMetadataState = new StreamsMetadataState( streamsMetadataState = new StreamsMetadataState(
this.topologyMetadata, this.topologyMetadata,
@ -1023,9 +1023,9 @@ public class KafkaStreams implements AutoCloseable {
delegatingStateRestoreListener = new DelegatingStateRestoreListener(); delegatingStateRestoreListener = new DelegatingStateRestoreListener();
delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener(); delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener();
totalCacheSize = getTotalCacheSize(applicationConfigs); totalCacheSize = totalCacheSize(applicationConfigs);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs); final int numStreamThreads = topologyMetadata.numStreamThreads(applicationConfigs);
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); final long cacheSizePerThread = cacheSizePerThread(numStreamThreads);
GlobalStreamThread.State globalThreadState = null; GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) { if (hasGlobalTopology) {
@ -1088,7 +1088,7 @@ public class KafkaStreams implements AutoCloseable {
return streamThread; return streamThread;
} }
static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { private static Metrics createMetrics(final StreamsConfig config, final Time time, final String clientId) {
final MetricConfig metricConfig = new MetricConfig() final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@ -1117,9 +1117,9 @@ public class KafkaStreams implements AutoCloseable {
if (isRunningOrRebalancing()) { if (isRunningOrRebalancing()) {
final StreamThread streamThread; final StreamThread streamThread;
synchronized (changeThreadCount) { synchronized (changeThreadCount) {
final int threadIdx = getNextThreadIndex(); final int threadIdx = nextThreadIndex();
final int numLiveThreads = getNumLiveStreamThreads(); final int numLiveThreads = numLiveStreamThreads();
final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1); final long cacheSizePerThread = cacheSizePerThread(numLiveThreads + 1);
log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
threadIdx, numLiveThreads + 1, cacheSizePerThread); threadIdx, numLiveThreads + 1, cacheSizePerThread);
resizeThreadCache(cacheSizePerThread); resizeThreadCache(cacheSizePerThread);
@ -1136,7 +1136,7 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state); log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown(); streamThread.shutdown();
threads.remove(streamThread); threads.remove(streamThread);
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads()); final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread); log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread); resizeThreadCache(cacheSizePerThread);
return Optional.empty(); return Optional.empty();
@ -1194,7 +1194,7 @@ public class KafkaStreams implements AutoCloseable {
// make a copy of threads to avoid holding lock // make a copy of threads to avoid holding lock
for (final StreamThread streamThread : new ArrayList<>(threads)) { for (final StreamThread streamThread : new ArrayList<>(threads)) {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName()); final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) { if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " + streamThread.getName()); log.info("Removing StreamThread " + streamThread.getName());
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID(); final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
streamThread.requestLeaveGroupDuringShutdown(); streamThread.requestLeaveGroupDuringShutdown();
@ -1216,7 +1216,7 @@ public class KafkaStreams implements AutoCloseable {
+ "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
} }
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads()); final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread); resizeThreadCache(cacheSizePerThread);
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
@ -1282,7 +1282,7 @@ public class KafkaStreams implements AutoCloseable {
* threads lock when looping threads. * threads lock when looping threads.
* @return number of alive stream threads * @return number of alive stream threads
*/ */
private int getNumLiveStreamThreads() { private int numLiveStreamThreads() {
final AtomicInteger numLiveThreads = new AtomicInteger(0); final AtomicInteger numLiveThreads = new AtomicInteger(0);
synchronized (threads) { synchronized (threads) {
@ -1301,7 +1301,7 @@ public class KafkaStreams implements AutoCloseable {
} }
} }
private int getNextThreadIndex() { private int nextThreadIndex() {
final HashSet<String> allLiveThreadNames = new HashSet<>(); final HashSet<String> allLiveThreadNames = new HashSet<>();
final AtomicInteger maxThreadId = new AtomicInteger(1); final AtomicInteger maxThreadId = new AtomicInteger(1);
synchronized (threads) { synchronized (threads) {
@ -1333,7 +1333,7 @@ public class KafkaStreams implements AutoCloseable {
} }
} }
private long getCacheSizePerThread(final int numStreamThreads) { private long cacheSizePerThread(final int numStreamThreads) {
if (numStreamThreads == 0) { if (numStreamThreads == 0) {
return totalCacheSize; return totalCacheSize;
} }
@ -1831,7 +1831,7 @@ public class KafkaStreams implements AutoCloseable {
*/ */
public void pause() { public void pause() {
if (topologyMetadata.hasNamedTopologies()) { if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) {
topologyMetadata.pauseTopology(namedTopology.name()); topologyMetadata.pauseTopology(namedTopology.name());
} }
} else { } else {
@ -1844,7 +1844,7 @@ public class KafkaStreams implements AutoCloseable {
*/ */
public boolean isPaused() { public boolean isPaused() {
if (topologyMetadata.hasNamedTopologies()) { if (topologyMetadata.hasNamedTopologies()) {
return topologyMetadata.getAllNamedTopologies().stream() return topologyMetadata.allNamedTopologies().stream()
.map(NamedTopology::name) .map(NamedTopology::name)
.allMatch(topologyMetadata::isPaused); .allMatch(topologyMetadata::isPaused);
} else { } else {
@ -1857,7 +1857,7 @@ public class KafkaStreams implements AutoCloseable {
*/ */
public void resume() { public void resume() {
if (topologyMetadata.hasNamedTopologies()) { if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) {
topologyMetadata.resumeTopology(namedTopology.name()); topologyMetadata.resumeTopology(namedTopology.name());
} }
} else { } else {

View File

@ -1252,14 +1252,14 @@ public class StreamsConfig extends AbstractConfig {
// Private API to enable the state updater (i.e. state updating on a dedicated thread) // Private API to enable the state updater (i.e. state updating on a dedicated thread)
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) { public static boolean stateUpdaterEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
} }
// Private API to enable processing threads (i.e. polling is decoupled from processing) // Private API to enable processing threads (i.e. polling is decoupled from processing)
public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__"; public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__";
public static boolean getProcessingThreadsEnabled(final Map<String, Object> configs) { public static boolean processingThreadsEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false); return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false);
} }

View File

@ -60,7 +60,7 @@ import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;
/** /**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
@ -167,7 +167,7 @@ public class TopologyConfig extends AbstractConfig {
final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
cacheSize = getTotalCacheSize(globalAppConfigs); cacheSize = totalCacheSize(globalAppConfigs);
} else { } else {
if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

View File

@ -77,7 +77,7 @@ public class StreamsConfigUtils {
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static long getTotalCacheSize(final StreamsConfig config) { public static long totalCacheSize(final StreamsConfig config) {
// both deprecated and new config set. Warn and use the new one. // both deprecated and new config set. Warn and use the new one.
if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) { if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) { if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {

View File

@ -48,7 +48,7 @@ public enum UpgradeFromValues {
this.value = value; this.value = value;
} }
public static UpgradeFromValues getValueFromString(final String upgradeFrom) { public static UpgradeFromValues fromString(final String upgradeFrom) {
return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", "")); return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", ""));
} }
public String toString() { public String toString() {

View File

@ -57,7 +57,7 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
return false; return false;
} }
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100: case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101: case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102: case UPGRADE_FROM_0102:

View File

@ -99,7 +99,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS
return true; return true;
} }
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100: case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101: case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102: case UPGRADE_FROM_0102:

View File

@ -75,7 +75,7 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
return false; return false;
} }
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100: case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101: case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102: case UPGRADE_FROM_0102:

View File

@ -47,8 +47,8 @@ import java.util.stream.Collectors;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode; import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.taskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
class ActiveTaskCreator { class ActiveTaskCreator {
private final TopologyMetadata topologyMetadata; private final TopologyMetadata topologyMetadata;
@ -269,7 +269,7 @@ class ActiveTaskCreator {
inputPartitions, inputPartitions,
topology, topology,
consumer, consumer,
topologyMetadata.getTaskConfigFor(taskId), topologyMetadata.taskConfig(taskId),
streamsMetrics, streamsMetrics,
stateDirectory, stateDirectory,
cache, cache,
@ -319,11 +319,11 @@ class ActiveTaskCreator {
Set<String> producerClientIds() { Set<String> producerClientIds() {
if (threadProducer != null) { if (threadProducer != null) {
return Collections.singleton(getThreadProducerClientId(threadId)); return Collections.singleton(threadProducerClientId(threadId));
} else { } else {
return taskProducers.keySet() return taskProducers.keySet()
.stream() .stream()
.map(taskId -> getTaskProducerClientId(threadId, taskId)) .map(taskId -> taskProducerClientId(threadId, taskId))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
} }

View File

@ -66,24 +66,23 @@ public class ClientUtils {
} }
// currently admin client is shared among all threads public static String adminClientId(final String clientId) {
public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin"; return clientId + "-admin";
} }
public static String getConsumerClientId(final String threadClientId) { public static String consumerClientId(final String threadClientId) {
return threadClientId + "-consumer"; return threadClientId + "-consumer";
} }
public static String getRestoreConsumerClientId(final String threadClientId) { public static String restoreConsumerClientId(final String threadClientId) {
return threadClientId + "-restore-consumer"; return threadClientId + "-restore-consumer";
} }
public static String getThreadProducerClientId(final String threadClientId) { public static String threadProducerClientId(final String threadClientId) {
return threadClientId + "-producer"; return threadClientId + "-producer";
} }
public static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) { public static String taskProducerClientId(final String threadClientId, final TaskId taskId) {
return threadClientId + "-" + taskId + "-producer"; return threadClientId + "-" + taskId + "-producer";
} }

View File

@ -66,7 +66,7 @@ public abstract class InternalTopicConfig {
* @param additionalRetentionMs - added to retention to allow for clock drift etc * @param additionalRetentionMs - added to retention to allow for clock drift etc
* @return Properties to be used when creating the topic * @return Properties to be used when creating the topic
*/ */
public abstract Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs); public abstract Map<String, String> properties(final Map<String, String> defaultProperties, final long additionalRetentionMs);
public boolean hasEnforcedNumberOfPartitions() { public boolean hasEnforcedNumberOfPartitions() {
return enforceNumberOfPartitions; return enforceNumberOfPartitions;

View File

@ -317,7 +317,7 @@ public class InternalTopicManager {
final long brokerSideRetentionMs = final long brokerSideRetentionMs =
Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName)); Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
final Map<String, String> streamsSideConfig = final Map<String, String> streamsSideConfig =
topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG)); final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG));
if (brokerSideRetentionMs < streamsSideRetentionMs) { if (brokerSideRetentionMs < streamsSideRetentionMs) {
validationResult.addMisconfiguration( validationResult.addMisconfiguration(
@ -356,7 +356,7 @@ public class InternalTopicManager {
final long brokerSideCompactionLagMs = final long brokerSideCompactionLagMs =
Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, topicName)); Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, topicName));
final Map<String, String> streamsSideConfig = final Map<String, String> streamsSideConfig =
topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
final long streamsSideCompactionLagMs = Long.parseLong(streamsSideConfig.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); final long streamsSideCompactionLagMs = Long.parseLong(streamsSideConfig.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG));
if (brokerSideCompactionLagMs < streamsSideCompactionLagMs) { if (brokerSideCompactionLagMs < streamsSideCompactionLagMs) {
validationResult.addMisconfiguration( validationResult.addMisconfiguration(
@ -482,7 +482,7 @@ public class InternalTopicManager {
continue; continue;
} }
final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName)); final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName));
final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); final Map<String, String> topicConfig = internalTopicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
log.debug("Going to create topic {} with {} partitions and config {}.", log.debug("Going to create topic {} with {} partitions and config {}.",
internalTopicConfig.name(), internalTopicConfig.name(),
@ -696,7 +696,7 @@ public class InternalTopicManager {
final Map<String, Map<String, String>> streamsSideTopicConfigs = topicConfigs.values().stream() final Map<String, Map<String, String>> streamsSideTopicConfigs = topicConfigs.values().stream()
.collect(Collectors.toMap( .collect(Collectors.toMap(
InternalTopicConfig::name, InternalTopicConfig::name,
topicConfig -> topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention) topicConfig -> topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention)
)); ));
final Set<String> createdTopics = new HashSet<>(); final Set<String> createdTopics = new HashSet<>();
final Set<String> topicStillToCreate = new HashSet<>(topicConfigs.keySet()); final Set<String> topicStillToCreate = new HashSet<>(topicConfigs.keySet());

View File

@ -25,7 +25,7 @@ public class InternalTopicProperties {
this.numberOfPartitions = numberOfPartitions; this.numberOfPartitions = numberOfPartitions;
} }
public Optional<Integer> getNumberOfPartitions() { public Optional<Integer> numberOfPartitions() {
return Optional.ofNullable(numberOfPartitions); return Optional.ofNullable(numberOfPartitions);
} }

View File

@ -749,8 +749,8 @@ public class InternalTopologyBuilder {
final Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>(); final Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>();
copartition.forEach(topic -> { copartition.forEach(topic -> {
final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic); final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
if (prop != null && prop.getNumberOfPartitions().isPresent()) { if (prop != null && prop.numberOfPartitions().isPresent()) {
numberOfPartitionsPerTopic.put(topic, prop.getNumberOfPartitions().get()); numberOfPartitionsPerTopic.put(topic, prop.numberOfPartitions().get());
} }
}); });
if (!numberOfPartitionsPerTopic.isEmpty() && copartition.equals(numberOfPartitionsPerTopic.keySet())) { if (!numberOfPartitionsPerTopic.isEmpty() && copartition.equals(numberOfPartitionsPerTopic.keySet())) {
@ -1199,7 +1199,7 @@ public class InternalTopologyBuilder {
final RepartitionTopicConfig repartitionTopicConfig = buildRepartitionTopicConfig( final RepartitionTopicConfig repartitionTopicConfig = buildRepartitionTopicConfig(
internalTopic, internalTopic,
internalTopicNamesWithProperties.get(topic).getNumberOfPartitions() internalTopicNamesWithProperties.get(topic).numberOfPartitions()
); );
repartitionTopics.put(repartitionTopicConfig.name(), repartitionTopicConfig); repartitionTopics.put(repartitionTopicConfig.name(), repartitionTopicConfig);

View File

@ -37,14 +37,14 @@ public final class ProcessorContextUtils {
/** /**
* Should be removed as part of KAFKA-10217 * Should be removed as part of KAFKA-10217
*/ */
public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) { public static StreamsMetricsImpl metricsImpl(final ProcessorContext context) {
return (StreamsMetricsImpl) context.metrics(); return (StreamsMetricsImpl) context.metrics();
} }
/** /**
* Should be removed as part of KAFKA-10217 * Should be removed as part of KAFKA-10217
*/ */
public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) { public static StreamsMetricsImpl metricsImpl(final StateStoreContext context) {
return (StreamsMetricsImpl) context.metrics(); return (StreamsMetricsImpl) context.metrics();
} }

View File

@ -57,7 +57,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
* @return Properties to be used when creating the topic * @return Properties to be used when creating the topic
*/ */
@Override @Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) { public Map<String, String> properties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES); final Map<String, String> topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES);

View File

@ -152,7 +152,7 @@ class StandbyTaskCreator {
taskId, taskId,
inputPartitions, inputPartitions,
topology, topology,
topologyMetadata.getTaskConfigFor(taskId), topologyMetadata.taskConfig(taskId),
streamsMetrics, streamsMetrics,
stateManager, stateManager,
stateDirectory, stateDirectory,

View File

@ -236,7 +236,7 @@ public class StoreChangelogReader implements ChangelogReader {
this.stateRestoreListener = stateRestoreListener; this.stateRestoreListener = stateRestoreListener;
this.standbyUpdateListener = standbyUpdateListener; this.standbyUpdateListener = standbyUpdateListener;
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));

View File

@ -81,9 +81,9 @@ import java.util.stream.Collectors;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode; import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getConsumerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getRestoreConsumerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId;
public class StreamThread extends Thread implements ProcessingThread { public class StreamThread extends Thread implements ProcessingThread {
@ -381,7 +381,7 @@ public class StreamThread extends Thread implements ProcessingThread {
referenceContainer.clientTags = config.getClientTags(); referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client"); log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId)); final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader( final StoreChangelogReader changelogReader = new StoreChangelogReader(
@ -396,8 +396,8 @@ public class StreamThread extends Thread implements ProcessingThread {
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
final boolean stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
final boolean proceessingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata, topologyMetadata,
config, config,
@ -425,7 +425,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final Tasks tasks = new Tasks(new LogContext(logPrefix)); final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled = final boolean processingThreadsEnabled =
InternalConfig.getProcessingThreadsEnabled(config.originals()); InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager = final DefaultTaskManager schedulingTaskManager =
maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
@ -460,7 +460,7 @@ public class StreamThread extends Thread implements ProcessingThread {
log.info("Creating consumer client"); log.info("Creating consumer client");
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), threadIdx); final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx);
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@ -495,7 +495,7 @@ public class StreamThread extends Thread implements ProcessingThread {
cache::resize cache::resize
); );
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId)); return streamThread.updateThreadMetadata(adminClientId(clientId));
} }
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled,
@ -641,8 +641,8 @@ public class StreamThread extends Thread implements ProcessingThread {
this.numIterations = 1; this.numIterations = 1;
this.eosEnabled = eosEnabled(config); this.eosEnabled = eosEnabled(config);
this.processingMode = processingMode(config); this.processingMode = processingMode(config);
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG); this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
} }
@ -1500,8 +1500,8 @@ public class StreamThread extends Thread implements ProcessingThread {
threadMetadata = new ThreadMetadataImpl( threadMetadata = new ThreadMetadataImpl(
getName(), getName(),
state().name(), state().name(),
getConsumerClientId(getName()), consumerClientId(getName()),
getRestoreConsumerClientId(getName()), restoreConsumerClientId(getName()),
taskManager.producerClientIds(), taskManager.producerClientIds(),
adminClientId, adminClientId,
Collections.emptySet(), Collections.emptySet(),
@ -1537,8 +1537,8 @@ public class StreamThread extends Thread implements ProcessingThread {
threadMetadata = new ThreadMetadataImpl( threadMetadata = new ThreadMetadataImpl(
getName(), getName(),
state().name(), state().name(),
getConsumerClientId(getName()), consumerClientId(getName()),
getRestoreConsumerClientId(getName()), restoreConsumerClientId(getName()),
taskManager.producerClientIds(), taskManager.producerClientIds(),
adminClientId, adminClientId,
activeTasksMetadata, activeTasksMetadata,

View File

@ -567,7 +567,7 @@ public class StreamsMetadataState {
} }
public String getStoreForChangelogTopic(final String topicName) { public String getStoreForChangelogTopic(final String topicName) {
return topologyMetadata.getStoreForChangelogTopic(topicName); return topologyMetadata.storeForChangelogTopic(topicName);
} }
private class SourceTopicsInfo { private class SourceTopicsInfo {

View File

@ -56,8 +56,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.taskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
/** /**
* {@code StreamsProducer} manages the producers within a Kafka Streams application. * {@code StreamsProducer} manages the producers within a Kafka Streams application.
@ -101,14 +101,14 @@ public class StreamsProducer {
final Map<String, Object> producerConfigs; final Map<String, Object> producerConfigs;
switch (processingMode) { switch (processingMode) {
case AT_LEAST_ONCE: { case AT_LEAST_ONCE: {
producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); producerConfigs = config.getProducerConfigs(threadProducerClientId(threadId));
eosV2ProducerConfigs = null; eosV2ProducerConfigs = null;
break; break;
} }
case EXACTLY_ONCE_ALPHA: { case EXACTLY_ONCE_ALPHA: {
producerConfigs = config.getProducerConfigs( producerConfigs = config.getProducerConfigs(
getTaskProducerClientId( taskProducerClientId(
threadId, threadId,
Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha") Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha")
) )
@ -122,7 +122,7 @@ public class StreamsProducer {
break; break;
} }
case EXACTLY_ONCE_V2: { case EXACTLY_ONCE_V2: {
producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); producerConfigs = config.getProducerConfigs(threadProducerClientId(threadId));
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
producerConfigs.put( producerConfigs.put(

View File

@ -189,7 +189,7 @@ public class TopologyMetadata {
public void maybeNotifyTopologyVersionListeners() { public void maybeNotifyTopologyVersionListeners() {
try { try {
lock(); lock();
final long minThreadVersion = getMinimumThreadVersion(); final long minThreadVersion = minimumThreadVersion();
final Iterator<TopologyVersionListener> iterator = version.activeTopologyUpdateListeners.listIterator(); final Iterator<TopologyVersionListener> iterator = version.activeTopologyUpdateListeners.listIterator();
TopologyVersionListener topologyVersionListener; TopologyVersionListener topologyVersionListener;
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -207,7 +207,7 @@ public class TopologyMetadata {
} }
// Return the minimum version across all live threads, or Long.MAX_VALUE if there are no threads running // Return the minimum version across all live threads, or Long.MAX_VALUE if there are no threads running
private long getMinimumThreadVersion() { private long minimumThreadVersion() {
final Optional<Long> minVersion = threadVersions.values().stream().min(Long::compare); final Optional<Long> minVersion = threadVersions.values().stream().min(Long::compare);
return minVersion.orElse(Long.MAX_VALUE); return minVersion.orElse(Long.MAX_VALUE);
} }
@ -312,7 +312,7 @@ public class TopologyMetadata {
return removeTopologyFuture; return removeTopologyFuture;
} }
public TaskConfig getTaskConfigFor(final TaskId taskId) { public TaskConfig taskConfig(final TaskId taskId) {
final InternalTopologyBuilder builder = lookupBuilderForTask(taskId); final InternalTopologyBuilder builder = lookupBuilderForTask(taskId);
return builder.topologyConfigs().getTaskConfig(); return builder.topologyConfigs().getTaskConfig();
} }
@ -360,7 +360,7 @@ public class TopologyMetadata {
allInputTopics.addAll(newInputTopics); allInputTopics.addAll(newInputTopics);
} }
public int getNumStreamThreads(final StreamsConfig config) { public int numStreamThreads(final StreamsConfig config) {
final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
// If there are named topologies but some are empty, this indicates a bug in user code // If there are named topologies but some are empty, this indicates a bug in user code
@ -531,7 +531,7 @@ public class TopologyMetadata {
return stateStoreNameToSourceTopics; return stateStoreNameToSourceTopics;
} }
public String getStoreForChangelogTopic(final String topicName) { public String storeForChangelogTopic(final String topicName) {
for (final InternalTopologyBuilder builder : builders.values()) { for (final InternalTopologyBuilder builder : builders.values()) {
final String store = builder.getStoreForChangelogTopic(topicName); final String store = builder.getStoreForChangelogTopic(topicName);
if (store != null) { if (store != null) {
@ -614,7 +614,7 @@ public class TopologyMetadata {
} }
} }
public Collection<NamedTopology> getAllNamedTopologies() { public Collection<NamedTopology> allNamedTopologies() {
return builders.values() return builders.values()
.stream() .stream()
.map(InternalTopologyBuilder::namedTopology) .map(InternalTopologyBuilder::namedTopology)

View File

@ -47,7 +47,7 @@ public class UnwindowedUnversionedChangelogTopicConfig extends InternalTopicConf
* @return Properties to be used when creating the topic * @return Properties to be used when creating the topic
*/ */
@Override @Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) { public Map<String, String> properties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); final Map<String, String> topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);

View File

@ -59,7 +59,7 @@ public class VersionedChangelogTopicConfig extends InternalTopicConfig {
* @return Properties to be used when creating the topic * @return Properties to be used when creating the topic
*/ */
@Override @Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long windowStoreAdditionalRetentionMs) { public Map<String, String> properties(final Map<String, String> defaultProperties, final long windowStoreAdditionalRetentionMs) {
// internal topic config override rule: library overrides < global config overrides < per-topic config overrides // internal topic config override rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(VERSIONED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); final Map<String, String> topicConfig = new HashMap<>(VERSIONED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);

View File

@ -51,7 +51,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
* @return Properties to be used when creating the topic * @return Properties to be used when creating the topic
*/ */
@Override @Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) { public Map<String, String> properties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); final Map<String, String> topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);

View File

@ -97,7 +97,7 @@ public final class AssignorConfiguration {
public RebalanceProtocol rebalanceProtocol() { public RebalanceProtocol rebalanceProtocol() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) { if (upgradeFrom != null) {
switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100: case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101: case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102: case UPGRADE_FROM_0102:
@ -154,7 +154,7 @@ public final class AssignorConfiguration {
public int configuredMetadataVersion(final int priorVersion) { public int configuredMetadataVersion(final int priorVersion) {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) { if (upgradeFrom != null) {
switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100: case UPGRADE_FROM_0100:
log.info( log.info(
"Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.", "Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.",

View File

@ -152,7 +152,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
} }
public Collection<NamedTopology> getAllTopologies() { public Collection<NamedTopology> getAllTopologies() {
return topologyMetadata.getAllNamedTopologies(); return topologyMetadata.allNamedTopologies();
} }
/** /**

View File

@ -247,7 +247,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
final StateStore root) { final StateStore root) {
this.context = context; this.context = context;
final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final String threadId = Thread.currentThread().getName(); final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString(); final String taskName = context.taskId().toString();

View File

@ -300,7 +300,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final StateStore root) { final StateStore root) {
this.context = context; this.context = context;
final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final String threadId = Thread.currentThread().getName(); final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString(); final String taskName = context.taskId().toString();

View File

@ -106,7 +106,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
public void init(final ProcessorContext context, final StateStore root) { public void init(final ProcessorContext context, final StateStore root) {
this.context = context; this.context = context;
final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final String threadId = Thread.currentThread().getName(); final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString(); final String taskName = context.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor( expiredRecordSensor = TaskMetrics.droppedRecordsSensor(

View File

@ -64,7 +64,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override @Override
public void openExisting(final ProcessorContext context, final long streamTime) { public void openExisting(final ProcessorContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
super.openExisting(context, streamTime); super.openExisting(context, streamTime);
} }
} }

View File

@ -104,7 +104,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
@Override @Override
public void openExisting(final ProcessorContext context, final long streamTime) { public void openExisting(final ProcessorContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir()); physicalStore.openDB(context.appConfigs(), context.stateDir());
} }

View File

@ -85,7 +85,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.getMetricsImpl; import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.metricsImpl;
/** /**
* A persistent key-value store based on RocksDB. * A persistent key-value store based on RocksDB.
@ -172,7 +172,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
public void init(final StateStoreContext context, public void init(final StateStoreContext context,
final StateStore root) { final StateStore root) {
// open the DB dir // open the DB dir
metricsRecorder.init(getMetricsImpl(context), context.taskId()); metricsRecorder.init(metricsImpl(context), context.taskId());
openDB(context.appConfigs(), context.stateDir()); openDB(context.appConfigs(), context.stateDir());
final File positionCheckpointFile = new File(context.stateDir(), name() + ".position"); final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");

View File

@ -355,7 +355,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
public void init(final ProcessorContext context, final StateStore root) { public void init(final ProcessorContext context, final StateStore root) {
this.context = context; this.context = context;
final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final String threadId = Thread.currentThread().getName(); final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString(); final String taskName = context.taskId().toString();
@ -365,7 +365,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
metrics metrics
); );
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
final File positionCheckpointFile = new File(context.stateDir(), name() + ".position"); final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);

View File

@ -64,7 +64,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override @Override
public void openExisting(final ProcessorContext context, final long streamTime) { public void openExisting(final ProcessorContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
super.openExisting(context, streamTime); super.openExisting(context, streamTime);
} }
} }

View File

@ -265,7 +265,7 @@ public class KafkaStreamsTest {
streamsConfigUtils = mockStatic(StreamsConfigUtils.class); streamsConfigUtils = mockStatic(StreamsConfigUtils.class);
streamsConfigUtils.when(() -> StreamsConfigUtils.processingMode(any(StreamsConfig.class))).thenReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE); streamsConfigUtils.when(() -> StreamsConfigUtils.processingMode(any(StreamsConfig.class))).thenReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
streamsConfigUtils.when(() -> StreamsConfigUtils.eosEnabled(any(StreamsConfig.class))).thenReturn(false); streamsConfigUtils.when(() -> StreamsConfigUtils.eosEnabled(any(StreamsConfig.class))).thenReturn(false);
streamsConfigUtils.when(() -> StreamsConfigUtils.getTotalCacheSize(any(StreamsConfig.class))).thenReturn(10 * 1024 * 1024L); streamsConfigUtils.when(() -> StreamsConfigUtils.totalCacheSize(any(StreamsConfig.class))).thenReturn(10 * 1024 * 1024L);
// setup global threads // setup global threads
final AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED); final AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED);

View File

@ -74,7 +74,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItem;
@ -1282,7 +1282,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props); final StreamsConfig config = new StreamsConfig(props);
assertEquals(getTotalCacheSize(config), 100); assertEquals(totalCacheSize(config), 100);
} }
@Test @Test
@ -1290,20 +1290,20 @@ public class StreamsConfigTest {
public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() { public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props); final StreamsConfig config = new StreamsConfig(props);
assertEquals(getTotalCacheSize(config), 10); assertEquals(totalCacheSize(config), 10);
} }
@Test @Test
public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() { public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props); final StreamsConfig config = new StreamsConfig(props);
assertEquals(getTotalCacheSize(config), 10); assertEquals(totalCacheSize(config), 10);
} }
@Test @Test
public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
final StreamsConfig config = new StreamsConfig(props); final StreamsConfig config = new StreamsConfig(props);
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); assertEquals(totalCacheSize(config), 10 * 1024 * 1024);
} }
@Test @Test

View File

@ -219,7 +219,7 @@ public class KStreamKStreamJoinTest {
assertThat(internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2)); assertThat(internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
for (final InternalTopicConfig config : internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) { for (final InternalTopicConfig config : internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
assertThat( assertThat(
config.getProperties(Collections.emptyMap(), 0).get("test"), config.properties(Collections.emptyMap(), 0).get("test"),
equalTo("property") equalTo("property")
); );
} }

View File

@ -49,7 +49,7 @@ public class InternalTopicConfigTest {
public void shouldSetCreateTimeByDefaultForWindowedChangelog() { public void shouldSetCreateTimeByDefaultForWindowedChangelog() {
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10); final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@ -57,7 +57,7 @@ public class InternalTopicConfigTest {
public void shouldSetCreateTimeByDefaultForUnwindowedUnversionedChangelog() { public void shouldSetCreateTimeByDefaultForUnwindowedUnversionedChangelog() {
final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", Collections.emptyMap()); final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", Collections.emptyMap());
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@ -65,7 +65,7 @@ public class InternalTopicConfigTest {
public void shouldSetCreateTimeByDefaultForVersionedChangelog() { public void shouldSetCreateTimeByDefaultForVersionedChangelog() {
final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12); final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@ -73,20 +73,20 @@ public class InternalTopicConfigTest {
public void shouldSetCreateTimeByDefaultForRepartitionTopic() { public void shouldSetCreateTimeByDefaultForRepartitionTopic() {
final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", Collections.emptyMap()); final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", Collections.emptyMap());
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@Test @Test
public void shouldAugmentRetentionMsWithWindowedChangelog() { public void shouldAugmentRetentionMsWithWindowedChangelog() {
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10); final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10);
assertEquals("30", topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("30", topicConfig.properties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG));
} }
@Test @Test
public void shouldAugmentCompactionLagMsWithVersionedChangelog() { public void shouldAugmentCompactionLagMsWithVersionedChangelog() {
final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12); final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12);
assertEquals(Long.toString(12 + 24 * 60 * 60 * 1000L), topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); assertEquals(Long.toString(12 + 24 * 60 * 60 * 1000L), topicConfig.properties(Collections.emptyMap(), 20).get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG));
} }
@Test @Test
@ -96,7 +96,7 @@ public class InternalTopicConfigTest {
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", configs, 10); final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", configs, 10);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@ -107,7 +107,7 @@ public class InternalTopicConfigTest {
final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", configs, 12); final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", configs, 12);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }
@ -120,7 +120,7 @@ public class InternalTopicConfigTest {
final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", configs); final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", configs);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("10000", properties.get(TopicConfig.RETENTION_BYTES_CONFIG)); assertEquals("10000", properties.get(TopicConfig.RETENTION_BYTES_CONFIG));
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
@ -134,7 +134,7 @@ public class InternalTopicConfigTest {
final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs); final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 0);
assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
} }

View File

@ -1782,7 +1782,7 @@ public class InternalTopicManagerTest {
topicName, topicName,
topicConfig.numberOfPartitions(), topicConfig.numberOfPartitions(),
Optional.of(streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()) Optional.of(streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue())
).configs(topicConfig.getProperties( ).configs(topicConfig.properties(
Collections.emptyMap(), Collections.emptyMap(),
streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)) streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG))
); );

View File

@ -920,14 +920,14 @@ public class InternalTopologyBuilderTest {
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo(); final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); final Map<String, String> properties1 = topicConfig1.properties(Collections.emptyMap(), 10000);
assertEquals(3, properties1.size()); assertEquals(3, properties1.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-store1-changelog", topicConfig1.name()); assertEquals("appId-store1-changelog", topicConfig1.name());
assertInstanceOf(WindowedChangelogTopicConfig.class, topicConfig1); assertInstanceOf(WindowedChangelogTopicConfig.class, topicConfig1);
final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); final Map<String, String> properties2 = topicConfig2.properties(Collections.emptyMap(), 10000);
assertEquals(3, properties2.size()); assertEquals(3, properties2.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
@ -952,7 +952,7 @@ public class InternalTopologyBuilderTest {
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo(); final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-vstore-changelog"); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-vstore-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 10000);
assertEquals(3, properties.size()); assertEquals(3, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals(Long.toString(60_000L + 24 * 60 * 60 * 1000L), properties.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); assertEquals(Long.toString(60_000L + 24 * 60 * 60 * 1000L), properties.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG));
@ -970,7 +970,7 @@ public class InternalTopologyBuilderTest {
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo(); final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog"); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 10000);
assertEquals(2, properties.size()); assertEquals(2, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-testStore-changelog", topicConfig.name()); assertEquals("appId-testStore-changelog", topicConfig.name());
@ -985,7 +985,7 @@ public class InternalTopologyBuilderTest {
builder.buildTopology(); builder.buildTopology();
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.subtopologyToTopicsInfo().values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.subtopologyToTopicsInfo().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000); final Map<String, String> properties = topicConfig.properties(Collections.emptyMap(), 10000);
assertEquals(4, properties.size()); assertEquals(4, properties.size());
assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));

View File

@ -139,7 +139,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
@ -1404,7 +1404,7 @@ public class StreamThreadTest {
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); .updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setStateListener( thread.setStateListener(
(t, newState, oldState) -> { (t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) { if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
@ -1478,7 +1478,7 @@ public class StreamThreadTest {
null, null,
HANDLER, HANDLER,
null null
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); ).updateThreadMetadata(adminClientId(CLIENT_ID));
final StreamsException thrown = assertThrows(StreamsException.class, thread::run); final StreamsException thrown = assertThrows(StreamsException.class, thread::run);
@ -1504,7 +1504,7 @@ public class StreamThreadTest {
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); .updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(); thread.shutdown();
verify(taskManager).shutdown(true); verify(taskManager).shutdown(true);
@ -1524,7 +1524,7 @@ public class StreamThreadTest {
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); .updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(); thread.shutdown();
// Execute the run method. Verification of the mock will check that shutdown was only done once // Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run(); thread.run();
@ -2628,7 +2628,7 @@ public class StreamThreadTest {
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); .updateThreadMetadata(adminClientId(CLIENT_ID));
consumer.schedulePollTask(() -> { consumer.schedulePollTask(() -> {
thread.setState(StreamThread.State.PARTITIONS_REVOKED); thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@ -2658,7 +2658,7 @@ public class StreamThreadTest {
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); .updateThreadMetadata(adminClientId(CLIENT_ID));
consumer.schedulePollTask(() -> { consumer.schedulePollTask(() -> {
thread.setState(StreamThread.State.PARTITIONS_REVOKED); thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@ -2720,7 +2720,7 @@ public class StreamThreadTest {
setState(State.PENDING_SHUTDOWN); setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasks); throw new TaskCorruptedException(corruptedTasks);
} }
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); }.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.run(); thread.run();
@ -2778,7 +2778,7 @@ public class StreamThreadTest {
setState(State.PENDING_SHUTDOWN); setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasks); throw new TaskCorruptedException(corruptedTasks);
} }
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); }.updateThreadMetadata(adminClientId(CLIENT_ID));
final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false); final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
@ -2845,7 +2845,7 @@ public class StreamThreadTest {
setState(State.PENDING_SHUTDOWN); setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasks); throw new TaskCorruptedException(corruptedTasks);
} }
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); }.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.STARTING);
thread.runLoop(); thread.runLoop();
@ -2908,7 +2908,7 @@ public class StreamThreadTest {
setState(State.PENDING_SHUTDOWN); setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasks); throw new TaskCorruptedException(corruptedTasks);
} }
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); }.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.STARTING);
thread.runLoop(); thread.runLoop();
@ -2968,7 +2968,7 @@ public class StreamThreadTest {
setState(State.PENDING_SHUTDOWN); setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasks); throw new TaskCorruptedException(corruptedTasks);
} }
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); }.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.STARTING);
thread.runLoop(); thread.runLoop();

View File

@ -104,7 +104,7 @@ public class RocksDBBlockCacheMetricsTest {
} }
public <T> void assertMetric(final StateStoreContext context, final String group, final String metricName, final T expected) { public <T> void assertMetric(final StateStoreContext context, final String group, final String metricName, final T expected) {
final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final MetricName name = metrics.metricsRegistry().metricName( final MetricName name = metrics.metricsRegistry().metricName(
metricName, metricName,
group, group,