From e4cc5d18f45d2965e4e1e571adc34bfb80cba642 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 21 Aug 2024 14:27:14 -0700 Subject: [PATCH] MINOR: remove get prefix for internal Kafka Streams methods (#16722) Reviewers: Lucas Brutschy --- .../examples/wordcount/WordCountDemo.java | 4 +- .../examples/wordcount/WordCountDemoTest.java | 8 ++-- .../apache/kafka/streams/KafkaStreams.java | 44 +++++++++---------- .../apache/kafka/streams/StreamsConfig.java | 4 +- .../apache/kafka/streams/TopologyConfig.java | 4 +- .../streams/internals/StreamsConfigUtils.java | 2 +- .../streams/internals/UpgradeFromValues.java | 2 +- .../kstream/internals/ChangedSerializer.java | 2 +- .../internals/KTableRepartitionMap.java | 2 +- .../SubscriptionWrapperSerde.java | 2 +- .../internals/ActiveTaskCreator.java | 10 ++--- .../processor/internals/ClientUtils.java | 11 +++-- .../internals/InternalTopicConfig.java | 2 +- .../internals/InternalTopicManager.java | 8 ++-- .../internals/InternalTopicProperties.java | 2 +- .../internals/InternalTopologyBuilder.java | 6 +-- .../internals/ProcessorContextUtils.java | 4 +- .../internals/RepartitionTopicConfig.java | 2 +- .../internals/StandbyTaskCreator.java | 2 +- .../internals/StoreChangelogReader.java | 2 +- .../processor/internals/StreamThread.java | 30 ++++++------- .../internals/StreamsMetadataState.java | 2 +- .../processor/internals/StreamsProducer.java | 10 ++--- .../processor/internals/TopologyMetadata.java | 12 ++--- ...ndowedUnversionedChangelogTopicConfig.java | 2 +- .../VersionedChangelogTopicConfig.java | 2 +- .../WindowedChangelogTopicConfig.java | 2 +- .../assignment/AssignorConfiguration.java | 4 +- .../KafkaStreamsNamedTopologyWrapper.java | 2 +- ...tDualSchemaRocksDBSegmentedBytesStore.java | 2 +- .../AbstractRocksDBSegmentedBytesStore.java | 2 +- .../state/internals/InMemoryWindowStore.java | 2 +- .../state/internals/KeyValueSegments.java | 2 +- .../internals/LogicalKeyValueSegments.java | 2 +- .../streams/state/internals/RocksDBStore.java | 4 +- .../internals/RocksDBVersionedStore.java | 4 +- .../state/internals/TimestampedSegments.java | 2 +- .../kafka/streams/KafkaStreamsTest.java | 2 +- .../kafka/streams/StreamsConfigTest.java | 10 ++--- .../internals/KStreamKStreamJoinTest.java | 2 +- .../internals/InternalTopicConfigTest.java | 20 ++++----- .../internals/InternalTopicManagerTest.java | 2 +- .../InternalTopologyBuilderTest.java | 10 ++--- .../processor/internals/StreamThreadTest.java | 24 +++++----- .../metrics/RocksDBBlockCacheMetricsTest.java | 2 +- 45 files changed, 140 insertions(+), 141 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 1664445f098..a9f2be7f64c 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -49,7 +49,7 @@ public final class WordCountDemo { public static final String INPUT_TOPIC = "streams-plaintext-input"; public static final String OUTPUT_TOPIC = "streams-wordcount-output"; - static Properties getStreamsConfig(final String[] args) throws IOException { + static Properties streamsConfig(final String[] args) throws IOException { final Properties props = new Properties(); if (args != null && args.length > 0) { try (final FileInputStream fis = new FileInputStream(args[0])) { @@ -85,7 +85,7 @@ public final class WordCountDemo { } public static void main(final String[] args) throws IOException { - final Properties props = getStreamsConfig(args); + final Properties props = streamsConfig(args); final StreamsBuilder builder = new StreamsBuilder(); createWordCountStream(builder); diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java index 6ce41cb7147..8435a0ae85a 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java @@ -58,7 +58,7 @@ public class WordCountDemoTest { final StreamsBuilder builder = new StreamsBuilder(); //Create Actual Stream Processing pipeline WordCountDemo.createWordCountStream(builder); - testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig(null)); + testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.streamsConfig(null)); inputTopic = testDriver.createInputTopic(WordCountDemo.INPUT_TOPIC, new StringSerializer(), new StringSerializer()); outputTopic = testDriver.createOutputTopic(WordCountDemo.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer()); } @@ -111,13 +111,13 @@ public class WordCountDemoTest { } @Test - public void testGetStreamsConfig() throws IOException { + public void testStreamsConfig() throws IOException { final File tmp = TestUtils.tempFile("bootstrap.servers=localhost:1234"); try { - Properties config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath()}); + Properties config = WordCountDemo.streamsConfig(new String[] {tmp.getPath()}); assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))); - config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath(), "extra", "args"}); + config = WordCountDemo.streamsConfig(new String[] {tmp.getPath(), "extra", "args"}); assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))); } finally { Files.deleteIfExists(tmp.toPath()); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index e9496c0cdc6..74c3568eb64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -112,7 +112,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; -import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; @@ -546,7 +546,7 @@ public class KafkaStreams implements AutoCloseable { closeToError(); break; case SHUTDOWN_APPLICATION: - if (getNumLiveStreamThreads() == 1) { + if (numLiveStreamThreads() == 1) { log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread"); addStreamThread(); } @@ -555,7 +555,7 @@ public class KafkaStreams implements AutoCloseable { "but the uncaught exception was an Error, which means this runtime is no " + "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); } - if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) { + if (Thread.currentThread().equals(globalStreamThread) && numLiveStreamThreads() == 0) { log.error("Exception in global thread caused the application to attempt to shutdown." + " This action will succeed only if there is at least one StreamThread running on this client." + " Currently there are no running threads so will now close the client."); @@ -991,12 +991,12 @@ public class KafkaStreams implements AutoCloseable { // use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; - adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); + adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); log.info("Kafka Streams version: {}", ClientMetrics.version()); log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); - metrics = getMetrics(applicationConfigs, time, clientId); + metrics = createMetrics(applicationConfigs, time, clientId); streamsMetrics = new StreamsMetricsImpl( metrics, clientId, @@ -1010,7 +1010,7 @@ public class KafkaStreams implements AutoCloseable { ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); threads = Collections.synchronizedList(new LinkedList<>()); - ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads()); + ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads()); streamsMetadataState = new StreamsMetadataState( this.topologyMetadata, @@ -1023,9 +1023,9 @@ public class KafkaStreams implements AutoCloseable { delegatingStateRestoreListener = new DelegatingStateRestoreListener(); delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener(); - totalCacheSize = getTotalCacheSize(applicationConfigs); - final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs); - final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); + totalCacheSize = totalCacheSize(applicationConfigs); + final int numStreamThreads = topologyMetadata.numStreamThreads(applicationConfigs); + final long cacheSizePerThread = cacheSizePerThread(numStreamThreads); GlobalStreamThread.State globalThreadState = null; if (hasGlobalTopology) { @@ -1088,7 +1088,7 @@ public class KafkaStreams implements AutoCloseable { return streamThread; } - static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { + private static Metrics createMetrics(final StreamsConfig config, final Time time, final String clientId) { final MetricConfig metricConfig = new MetricConfig() .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -1117,9 +1117,9 @@ public class KafkaStreams implements AutoCloseable { if (isRunningOrRebalancing()) { final StreamThread streamThread; synchronized (changeThreadCount) { - final int threadIdx = getNextThreadIndex(); - final int numLiveThreads = getNumLiveStreamThreads(); - final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1); + final int threadIdx = nextThreadIndex(); + final int numLiveThreads = numLiveStreamThreads(); + final long cacheSizePerThread = cacheSizePerThread(numLiveThreads + 1); log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", threadIdx, numLiveThreads + 1, cacheSizePerThread); resizeThreadCache(cacheSizePerThread); @@ -1136,7 +1136,7 @@ public class KafkaStreams implements AutoCloseable { log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state); streamThread.shutdown(); threads.remove(streamThread); - final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads()); + final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread); resizeThreadCache(cacheSizePerThread); return Optional.empty(); @@ -1194,7 +1194,7 @@ public class KafkaStreams implements AutoCloseable { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName()); - if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) { + if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) { log.info("Removing StreamThread " + streamThread.getName()); final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.requestLeaveGroupDuringShutdown(); @@ -1216,7 +1216,7 @@ public class KafkaStreams implements AutoCloseable { + "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); } - final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads()); + final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); resizeThreadCache(cacheSizePerThread); if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { @@ -1282,7 +1282,7 @@ public class KafkaStreams implements AutoCloseable { * threads lock when looping threads. * @return number of alive stream threads */ - private int getNumLiveStreamThreads() { + private int numLiveStreamThreads() { final AtomicInteger numLiveThreads = new AtomicInteger(0); synchronized (threads) { @@ -1301,7 +1301,7 @@ public class KafkaStreams implements AutoCloseable { } } - private int getNextThreadIndex() { + private int nextThreadIndex() { final HashSet allLiveThreadNames = new HashSet<>(); final AtomicInteger maxThreadId = new AtomicInteger(1); synchronized (threads) { @@ -1333,7 +1333,7 @@ public class KafkaStreams implements AutoCloseable { } } - private long getCacheSizePerThread(final int numStreamThreads) { + private long cacheSizePerThread(final int numStreamThreads) { if (numStreamThreads == 0) { return totalCacheSize; } @@ -1831,7 +1831,7 @@ public class KafkaStreams implements AutoCloseable { */ public void pause() { if (topologyMetadata.hasNamedTopologies()) { - for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { + for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) { topologyMetadata.pauseTopology(namedTopology.name()); } } else { @@ -1844,7 +1844,7 @@ public class KafkaStreams implements AutoCloseable { */ public boolean isPaused() { if (topologyMetadata.hasNamedTopologies()) { - return topologyMetadata.getAllNamedTopologies().stream() + return topologyMetadata.allNamedTopologies().stream() .map(NamedTopology::name) .allMatch(topologyMetadata::isPaused); } else { @@ -1857,7 +1857,7 @@ public class KafkaStreams implements AutoCloseable { */ public void resume() { if (topologyMetadata.hasNamedTopologies()) { - for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { + for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) { topologyMetadata.resumeTopology(namedTopology.name()); } } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f3644a6ffbd..9767373fc2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1252,14 +1252,14 @@ public class StreamsConfig extends AbstractConfig { // Private API to enable the state updater (i.e. state updating on a dedicated thread) public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; - public static boolean getStateUpdaterEnabled(final Map configs) { + public static boolean stateUpdaterEnabled(final Map configs) { return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__"; - public static boolean getProcessingThreadsEnabled(final Map configs) { + public static boolean processingThreadsEnabled(final Map configs) { return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false); } diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index f53f7f72bc7..86bd4e9eb78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -60,7 +60,7 @@ import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_ import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC; import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; -import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize; /** * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the @@ -167,7 +167,7 @@ public class TopologyConfig extends AbstractConfig { final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { - cacheSize = getTotalCacheSize(globalAppConfigs); + cacheSize = totalCacheSize(globalAppConfigs); } else { if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java index 28af63144f7..d0e92abc471 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java @@ -77,7 +77,7 @@ public class StreamsConfigUtils { } @SuppressWarnings("deprecation") - public static long getTotalCacheSize(final StreamsConfig config) { + public static long totalCacheSize(final StreamsConfig config) { // both deprecated and new config set. Warn and use the new one. if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) { if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) { diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java index 7d65205c555..66e079eecac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java @@ -48,7 +48,7 @@ public enum UpgradeFromValues { this.value = value; } - public static UpgradeFromValues getValueFromString(final String upgradeFrom) { + public static UpgradeFromValues fromString(final String upgradeFrom) { return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", "")); } public String toString() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 7aca5245243..4964c707d9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -57,7 +57,7 @@ public class ChangedSerializer implements Serializer>, WrappingNull return false; } - switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { + switch (UpgradeFromValues.fromString((String) upgradeFrom)) { case UPGRADE_FROM_0100: case UPGRADE_FROM_0101: case UPGRADE_FROM_0102: diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index a08bf978d40..f5cc449db7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -99,7 +99,7 @@ public class KTableRepartitionMap implements KTableRepartitionMapS return true; } - switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { + switch (UpgradeFromValues.fromString((String) upgradeFrom)) { case UPGRADE_FROM_0100: case UPGRADE_FROM_0101: case UPGRADE_FROM_0102: diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index 5368ce7fb22..39839af1252 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -75,7 +75,7 @@ public class SubscriptionWrapperSerde extends WrappingNullableSerde producerClientIds() { if (threadProducer != null) { - return Collections.singleton(getThreadProducerClientId(threadId)); + return Collections.singleton(threadProducerClientId(threadId)); } else { return taskProducers.keySet() .stream() - .map(taskId -> getTaskProducerClientId(threadId, taskId)) + .map(taskId -> taskProducerClientId(threadId, taskId)) .collect(Collectors.toSet()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java index 6afac9e7ae2..46e57a54310 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -66,24 +66,23 @@ public class ClientUtils { } - // currently admin client is shared among all threads - public static String getSharedAdminClientId(final String clientId) { + public static String adminClientId(final String clientId) { return clientId + "-admin"; } - public static String getConsumerClientId(final String threadClientId) { + public static String consumerClientId(final String threadClientId) { return threadClientId + "-consumer"; } - public static String getRestoreConsumerClientId(final String threadClientId) { + public static String restoreConsumerClientId(final String threadClientId) { return threadClientId + "-restore-consumer"; } - public static String getThreadProducerClientId(final String threadClientId) { + public static String threadProducerClientId(final String threadClientId) { return threadClientId + "-producer"; } - public static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) { + public static String taskProducerClientId(final String threadClientId, final TaskId taskId) { return threadClientId + "-" + taskId + "-producer"; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java index 0b5a32799d8..b24960f5270 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -66,7 +66,7 @@ public abstract class InternalTopicConfig { * @param additionalRetentionMs - added to retention to allow for clock drift etc * @return Properties to be used when creating the topic */ - public abstract Map getProperties(final Map defaultProperties, final long additionalRetentionMs); + public abstract Map properties(final Map defaultProperties, final long additionalRetentionMs); public boolean hasEnforcedNumberOfPartitions() { return enforceNumberOfPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index aeaa84f911d..2af23872615 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -317,7 +317,7 @@ public class InternalTopicManager { final long brokerSideRetentionMs = Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName)); final Map streamsSideConfig = - topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention); final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG)); if (brokerSideRetentionMs < streamsSideRetentionMs) { validationResult.addMisconfiguration( @@ -356,7 +356,7 @@ public class InternalTopicManager { final long brokerSideCompactionLagMs = Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, topicName)); final Map streamsSideConfig = - topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention); final long streamsSideCompactionLagMs = Long.parseLong(streamsSideConfig.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); if (brokerSideCompactionLagMs < streamsSideCompactionLagMs) { validationResult.addMisconfiguration( @@ -482,7 +482,7 @@ public class InternalTopicManager { continue; } final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName)); - final Map topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + final Map topicConfig = internalTopicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention); log.debug("Going to create topic {} with {} partitions and config {}.", internalTopicConfig.name(), @@ -696,7 +696,7 @@ public class InternalTopicManager { final Map> streamsSideTopicConfigs = topicConfigs.values().stream() .collect(Collectors.toMap( InternalTopicConfig::name, - topicConfig -> topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention) + topicConfig -> topicConfig.properties(defaultTopicConfigs, windowChangeLogAdditionalRetention) )); final Set createdTopics = new HashSet<>(); final Set topicStillToCreate = new HashSet<>(topicConfigs.keySet()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicProperties.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicProperties.java index b98780c56fa..206172cbade 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicProperties.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicProperties.java @@ -25,7 +25,7 @@ public class InternalTopicProperties { this.numberOfPartitions = numberOfPartitions; } - public Optional getNumberOfPartitions() { + public Optional numberOfPartitions() { return Optional.ofNullable(numberOfPartitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 4d760fd8421..9d89f4a7d72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -749,8 +749,8 @@ public class InternalTopologyBuilder { final Map numberOfPartitionsPerTopic = new HashMap<>(); copartition.forEach(topic -> { final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic); - if (prop != null && prop.getNumberOfPartitions().isPresent()) { - numberOfPartitionsPerTopic.put(topic, prop.getNumberOfPartitions().get()); + if (prop != null && prop.numberOfPartitions().isPresent()) { + numberOfPartitionsPerTopic.put(topic, prop.numberOfPartitions().get()); } }); if (!numberOfPartitionsPerTopic.isEmpty() && copartition.equals(numberOfPartitionsPerTopic.keySet())) { @@ -1199,7 +1199,7 @@ public class InternalTopologyBuilder { final RepartitionTopicConfig repartitionTopicConfig = buildRepartitionTopicConfig( internalTopic, - internalTopicNamesWithProperties.get(topic).getNumberOfPartitions() + internalTopicNamesWithProperties.get(topic).numberOfPartitions() ); repartitionTopics.put(repartitionTopicConfig.name(), repartitionTopicConfig); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index c93297c97cc..50d71f8ba01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -37,14 +37,14 @@ public final class ProcessorContextUtils { /** * Should be removed as part of KAFKA-10217 */ - public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) { + public static StreamsMetricsImpl metricsImpl(final ProcessorContext context) { return (StreamsMetricsImpl) context.metrics(); } /** * Should be removed as part of KAFKA-10217 */ - public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) { + public static StreamsMetricsImpl metricsImpl(final StateStoreContext context) { return (StreamsMetricsImpl) context.metrics(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java index 098bb9f58a1..2419670d94f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java @@ -57,7 +57,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig { * @return Properties to be used when creating the topic */ @Override - public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { + public Map properties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 5eb74184257..de0c2fcdf62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -152,7 +152,7 @@ class StandbyTaskCreator { taskId, inputPartitions, topology, - topologyMetadata.getTaskConfigFor(taskId), + topologyMetadata.taskConfig(taskId), streamsMetrics, stateManager, stateDirectory, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index dbd25b62eb5..59c8b997594 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -236,7 +236,7 @@ public class StoreChangelogReader implements ChangelogReader { this.stateRestoreListener = stateRestoreListener; this.standbyUpdateListener = standbyUpdateListener; - this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); + this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 05c832811ad..d402af82032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -81,9 +81,9 @@ import java.util.stream.Collectors; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getConsumerClientId; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getRestoreConsumerClientId; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId; public class StreamThread extends Thread implements ProcessingThread { @@ -381,7 +381,7 @@ public class StreamThread extends Thread implements ProcessingThread { referenceContainer.clientTags = config.getClientTags(); log.info("Creating restore consumer client"); - final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId)); + final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( @@ -396,8 +396,8 @@ public class StreamThread extends Thread implements ProcessingThread { final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); - final boolean stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); - final boolean proceessingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); + final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( topologyMetadata, config, @@ -425,7 +425,7 @@ public class StreamThread extends Thread implements ProcessingThread { final Tasks tasks = new Tasks(new LogContext(logPrefix)); final boolean processingThreadsEnabled = - InternalConfig.getProcessingThreadsEnabled(config.originals()); + InternalConfig.processingThreadsEnabled(config.originals()); final DefaultTaskManager schedulingTaskManager = maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); @@ -460,7 +460,7 @@ public class StreamThread extends Thread implements ProcessingThread { log.info("Creating consumer client"); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), threadIdx); + final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx); consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); @@ -495,7 +495,7 @@ public class StreamThread extends Thread implements ProcessingThread { cache::resize ); - return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId)); + return streamThread.updateThreadMetadata(adminClientId(clientId)); } private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, @@ -641,8 +641,8 @@ public class StreamThread extends Thread implements ProcessingThread { this.numIterations = 1; this.eosEnabled = eosEnabled(config); this.processingMode = processingMode(config); - this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals()); - this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals()); + this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG); } @@ -1500,8 +1500,8 @@ public class StreamThread extends Thread implements ProcessingThread { threadMetadata = new ThreadMetadataImpl( getName(), state().name(), - getConsumerClientId(getName()), - getRestoreConsumerClientId(getName()), + consumerClientId(getName()), + restoreConsumerClientId(getName()), taskManager.producerClientIds(), adminClientId, Collections.emptySet(), @@ -1537,8 +1537,8 @@ public class StreamThread extends Thread implements ProcessingThread { threadMetadata = new ThreadMetadataImpl( getName(), state().name(), - getConsumerClientId(getName()), - getRestoreConsumerClientId(getName()), + consumerClientId(getName()), + restoreConsumerClientId(getName()), taskManager.producerClientIds(), adminClientId, activeTasksMetadata, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 9328004a6b0..9b7dbbe94b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -567,7 +567,7 @@ public class StreamsMetadataState { } public String getStoreForChangelogTopic(final String topicName) { - return topologyMetadata.getStoreForChangelogTopic(topicName); + return topologyMetadata.storeForChangelogTopic(topicName); } private class SourceTopicsInfo { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index ccd0f415045..5d5efde6f5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -56,8 +56,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.taskProducerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId; /** * {@code StreamsProducer} manages the producers within a Kafka Streams application. @@ -101,14 +101,14 @@ public class StreamsProducer { final Map producerConfigs; switch (processingMode) { case AT_LEAST_ONCE: { - producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); + producerConfigs = config.getProducerConfigs(threadProducerClientId(threadId)); eosV2ProducerConfigs = null; break; } case EXACTLY_ONCE_ALPHA: { producerConfigs = config.getProducerConfigs( - getTaskProducerClientId( + taskProducerClientId( threadId, Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha") ) @@ -122,7 +122,7 @@ public class StreamsProducer { break; } case EXACTLY_ONCE_V2: { - producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); + producerConfigs = config.getProducerConfigs(threadProducerClientId(threadId)); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); producerConfigs.put( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index deb61c7f8db..992d19c2d82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -189,7 +189,7 @@ public class TopologyMetadata { public void maybeNotifyTopologyVersionListeners() { try { lock(); - final long minThreadVersion = getMinimumThreadVersion(); + final long minThreadVersion = minimumThreadVersion(); final Iterator iterator = version.activeTopologyUpdateListeners.listIterator(); TopologyVersionListener topologyVersionListener; while (iterator.hasNext()) { @@ -207,7 +207,7 @@ public class TopologyMetadata { } // Return the minimum version across all live threads, or Long.MAX_VALUE if there are no threads running - private long getMinimumThreadVersion() { + private long minimumThreadVersion() { final Optional minVersion = threadVersions.values().stream().min(Long::compare); return minVersion.orElse(Long.MAX_VALUE); } @@ -312,7 +312,7 @@ public class TopologyMetadata { return removeTopologyFuture; } - public TaskConfig getTaskConfigFor(final TaskId taskId) { + public TaskConfig taskConfig(final TaskId taskId) { final InternalTopologyBuilder builder = lookupBuilderForTask(taskId); return builder.topologyConfigs().getTaskConfig(); } @@ -360,7 +360,7 @@ public class TopologyMetadata { allInputTopics.addAll(newInputTopics); } - public int getNumStreamThreads(final StreamsConfig config) { + public int numStreamThreads(final StreamsConfig config) { final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); // If there are named topologies but some are empty, this indicates a bug in user code @@ -531,7 +531,7 @@ public class TopologyMetadata { return stateStoreNameToSourceTopics; } - public String getStoreForChangelogTopic(final String topicName) { + public String storeForChangelogTopic(final String topicName) { for (final InternalTopologyBuilder builder : builders.values()) { final String store = builder.getStoreForChangelogTopic(topicName); if (store != null) { @@ -614,7 +614,7 @@ public class TopologyMetadata { } } - public Collection getAllNamedTopologies() { + public Collection allNamedTopologies() { return builders.values() .stream() .map(InternalTopologyBuilder::namedTopology) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedUnversionedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedUnversionedChangelogTopicConfig.java index 601d0165d5a..b86260d767c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedUnversionedChangelogTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedUnversionedChangelogTopicConfig.java @@ -47,7 +47,7 @@ public class UnwindowedUnversionedChangelogTopicConfig extends InternalTopicConf * @return Properties to be used when creating the topic */ @Override - public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { + public Map properties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/VersionedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/VersionedChangelogTopicConfig.java index df1d2353926..2e154d91291 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/VersionedChangelogTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/VersionedChangelogTopicConfig.java @@ -59,7 +59,7 @@ public class VersionedChangelogTopicConfig extends InternalTopicConfig { * @return Properties to be used when creating the topic */ @Override - public Map getProperties(final Map defaultProperties, final long windowStoreAdditionalRetentionMs) { + public Map properties(final Map defaultProperties, final long windowStoreAdditionalRetentionMs) { // internal topic config override rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(VERSIONED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java index 3280693920c..50e8cd63b5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java @@ -51,7 +51,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig { * @return Properties to be used when creating the topic */ @Override - public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { + public Map properties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 28257eacfcd..ad6aca2bac8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -97,7 +97,7 @@ public final class AssignorConfiguration { public RebalanceProtocol rebalanceProtocol() { final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); if (upgradeFrom != null) { - switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { + switch (UpgradeFromValues.fromString(upgradeFrom)) { case UPGRADE_FROM_0100: case UPGRADE_FROM_0101: case UPGRADE_FROM_0102: @@ -154,7 +154,7 @@ public final class AssignorConfiguration { public int configuredMetadataVersion(final int priorVersion) { final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); if (upgradeFrom != null) { - switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { + switch (UpgradeFromValues.fromString(upgradeFrom)) { case UPGRADE_FROM_0100: log.info( "Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 6c08c06f2ae..f9d882bdb0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -152,7 +152,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams { } public Collection getAllTopologies() { - return topologyMetadata.getAllNamedTopologies(); + return topologyMetadata.allNamedTopologies(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index afdb39f6cd8..f42518a1537 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -247,7 +247,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements Se final StateStore root) { this.context = context; - final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); + final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context); final String threadId = Thread.currentThread().getName(); final String taskName = context.taskId().toString(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 1d24d08ee71..713be8a4a20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -106,7 +106,7 @@ public class InMemoryWindowStore implements WindowStore { public void init(final ProcessorContext context, final StateStore root) { this.context = context; - final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); + final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context); final String threadId = Thread.currentThread().getName(); final String taskName = context.taskId().toString(); expiredRecordSensor = TaskMetrics.droppedRecordsSensor( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java index c08c106f155..304d77e8259 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -64,7 +64,7 @@ class KeyValueSegments extends AbstractSegments { @Override public void openExisting(final ProcessorContext context, final long streamTime) { - metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); + metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); super.openExisting(context, streamTime); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index a13050ccf83..85985f9d373 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -104,7 +104,7 @@ public class LogicalKeyValueSegments extends AbstractSegments, BatchWritingS public void init(final StateStoreContext context, final StateStore root) { // open the DB dir - metricsRecorder.init(getMetricsImpl(context), context.taskId()); + metricsRecorder.init(metricsImpl(context), context.taskId()); openDB(context.appConfigs(), context.stateDir()); final File positionCheckpointFile = new File(context.stateDir(), name() + ".position"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 7ca4e4cff94..a10f6530691 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -355,7 +355,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore { @Override public void openExisting(final ProcessorContext context, final long streamTime) { - metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId()); + metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); super.openExisting(context, streamTime); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 6ad4e4afc6e..0aec38de0eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -265,7 +265,7 @@ public class KafkaStreamsTest { streamsConfigUtils = mockStatic(StreamsConfigUtils.class); streamsConfigUtils.when(() -> StreamsConfigUtils.processingMode(any(StreamsConfig.class))).thenReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE); streamsConfigUtils.when(() -> StreamsConfigUtils.eosEnabled(any(StreamsConfig.class))).thenReturn(false); - streamsConfigUtils.when(() -> StreamsConfigUtils.getTotalCacheSize(any(StreamsConfig.class))).thenReturn(10 * 1024 * 1024L); + streamsConfigUtils.when(() -> StreamsConfigUtils.totalCacheSize(any(StreamsConfig.class))).thenReturn(10 * 1024 * 1024L); // setup global threads final AtomicReference globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index b903b311028..8e936f328ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -74,7 +74,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; -import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; @@ -1282,7 +1282,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(getTotalCacheSize(config), 100); + assertEquals(totalCacheSize(config), 100); } @Test @@ -1290,20 +1290,20 @@ public class StreamsConfigTest { public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() { props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(getTotalCacheSize(config), 10); + assertEquals(totalCacheSize(config), 10); } @Test public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() { props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(getTotalCacheSize(config), 10); + assertEquals(totalCacheSize(config), 10); } @Test public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { final StreamsConfig config = new StreamsConfig(props); - assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); + assertEquals(totalCacheSize(config), 10 * 1024 * 1024); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 26ed077e968..f388441f356 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -219,7 +219,7 @@ public class KStreamKStreamJoinTest { assertThat(internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2)); for (final InternalTopicConfig config : internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) { assertThat( - config.getProperties(Collections.emptyMap(), 0).get("test"), + config.properties(Collections.emptyMap(), 0).get("test"), equalTo("property") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java index 8bb0894d00d..8ba6293d98c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java @@ -49,7 +49,7 @@ public class InternalTopicConfigTest { public void shouldSetCreateTimeByDefaultForWindowedChangelog() { final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @@ -57,7 +57,7 @@ public class InternalTopicConfigTest { public void shouldSetCreateTimeByDefaultForUnwindowedUnversionedChangelog() { final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", Collections.emptyMap()); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @@ -65,7 +65,7 @@ public class InternalTopicConfigTest { public void shouldSetCreateTimeByDefaultForVersionedChangelog() { final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @@ -73,20 +73,20 @@ public class InternalTopicConfigTest { public void shouldSetCreateTimeByDefaultForRepartitionTopic() { final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", Collections.emptyMap()); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @Test public void shouldAugmentRetentionMsWithWindowedChangelog() { final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap(), 10); - assertEquals("30", topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG)); + assertEquals("30", topicConfig.properties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG)); } @Test public void shouldAugmentCompactionLagMsWithVersionedChangelog() { final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", Collections.emptyMap(), 12); - assertEquals(Long.toString(12 + 24 * 60 * 60 * 1000L), topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); + assertEquals(Long.toString(12 + 24 * 60 * 60 * 1000L), topicConfig.properties(Collections.emptyMap(), 20).get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); } @Test @@ -96,7 +96,7 @@ public class InternalTopicConfigTest { final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", configs, 10); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @@ -107,7 +107,7 @@ public class InternalTopicConfigTest { final VersionedChangelogTopicConfig topicConfig = new VersionedChangelogTopicConfig("name", configs, 12); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @@ -120,7 +120,7 @@ public class InternalTopicConfigTest { final UnwindowedUnversionedChangelogTopicConfig topicConfig = new UnwindowedUnversionedChangelogTopicConfig("name", configs); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("10000", properties.get(TopicConfig.RETENTION_BYTES_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); @@ -134,7 +134,7 @@ public class InternalTopicConfigTest { final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + final Map properties = topicConfig.properties(Collections.emptyMap(), 0); assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 5496c4de5f4..fdc37f631e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -1782,7 +1782,7 @@ public class InternalTopicManagerTest { topicName, topicConfig.numberOfPartitions(), Optional.of(streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()) - ).configs(topicConfig.getProperties( + ).configs(topicConfig.properties( Collections.emptyMap(), streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 345c3da1ca8..d284c06165e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -920,14 +920,14 @@ public class InternalTopologyBuilderTest { final Map topicGroups = builder.subtopologyToTopicsInfo(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); - final Map properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); + final Map properties1 = topicConfig1.properties(Collections.emptyMap(), 10000); assertEquals(3, properties1.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("appId-store1-changelog", topicConfig1.name()); assertInstanceOf(WindowedChangelogTopicConfig.class, topicConfig1); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); - final Map properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); + final Map properties2 = topicConfig2.properties(Collections.emptyMap(), 10000); assertEquals(3, properties2.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); @@ -952,7 +952,7 @@ public class InternalTopologyBuilderTest { final Map topicGroups = builder.subtopologyToTopicsInfo(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-vstore-changelog"); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); + final Map properties = topicConfig.properties(Collections.emptyMap(), 10000); assertEquals(3, properties.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(Long.toString(60_000L + 24 * 60 * 60 * 1000L), properties.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)); @@ -970,7 +970,7 @@ public class InternalTopologyBuilderTest { final Map topicGroups = builder.subtopologyToTopicsInfo(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog"); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); + final Map properties = topicConfig.properties(Collections.emptyMap(), 10000); assertEquals(2, properties.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-testStore-changelog", topicConfig.name()); @@ -985,7 +985,7 @@ public class InternalTopologyBuilderTest { builder.buildTopology(); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.subtopologyToTopicsInfo().values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); + final Map properties = topicConfig.properties(Collections.emptyMap(), 10000); assertEquals(4, properties.size()); assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 663d2a81cdb..d8b7518faaa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -139,7 +139,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; @@ -1404,7 +1404,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) - .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + .updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setStateListener( (t, newState, oldState) -> { if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) { @@ -1478,7 +1478,7 @@ public class StreamThreadTest { null, HANDLER, null - ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + ).updateThreadMetadata(adminClientId(CLIENT_ID)); final StreamsException thrown = assertThrows(StreamsException.class, thread::run); @@ -1504,7 +1504,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) - .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + .updateThreadMetadata(adminClientId(CLIENT_ID)); thread.shutdown(); verify(taskManager).shutdown(true); @@ -1524,7 +1524,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) - .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + .updateThreadMetadata(adminClientId(CLIENT_ID)); thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once thread.run(); @@ -2628,7 +2628,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) - .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + .updateThreadMetadata(adminClientId(CLIENT_ID)); consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -2658,7 +2658,7 @@ public class StreamThreadTest { final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) - .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + .updateThreadMetadata(adminClientId(CLIENT_ID)); consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -2720,7 +2720,7 @@ public class StreamThreadTest { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } - }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + }.updateThreadMetadata(adminClientId(CLIENT_ID)); thread.run(); @@ -2778,7 +2778,7 @@ public class StreamThreadTest { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } - }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + }.updateThreadMetadata(adminClientId(CLIENT_ID)); final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false); @@ -2845,7 +2845,7 @@ public class StreamThreadTest { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } - }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + }.updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setState(StreamThread.State.STARTING); thread.runLoop(); @@ -2908,7 +2908,7 @@ public class StreamThreadTest { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } - }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + }.updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setState(StreamThread.State.STARTING); thread.runLoop(); @@ -2968,7 +2968,7 @@ public class StreamThreadTest { setState(State.PENDING_SHUTDOWN); throw new TaskCorruptedException(corruptedTasks); } - }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + }.updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setState(StreamThread.State.STARTING); thread.runLoop(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java index 3e308bb85b5..624edbb8eb4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java @@ -104,7 +104,7 @@ public class RocksDBBlockCacheMetricsTest { } public void assertMetric(final StateStoreContext context, final String group, final String metricName, final T expected) { - final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context); + final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context); final MetricName name = metrics.metricsRegistry().metricName( metricName, group,