diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java index b24960f5270..f7af624b1ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -82,8 +82,7 @@ public abstract class InternalTopicConfig { public void setNumberOfPartitions(final int numberOfPartitions) { if (hasEnforcedNumberOfPartitions()) { - throw new UnsupportedOperationException("number of partitions are enforced on topic " + - "" + name() + " and can't be altered."); + throw new UnsupportedOperationException("number of partitions are enforced on topic " + name() + " and can't be altered."); } validateNumberOfPartitions(numberOfPartitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 6e79616d30a..e95e098c26c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -119,11 +119,9 @@ public class SinkNode extends ProcessorNode { */ @Override public String toString(final String indent) { - final StringBuilder sb = new StringBuilder(super.toString(indent)); - sb.append(indent).append("\ttopic:\t\t"); - sb.append(topicExtractor); - sb.append("\n"); - return sb.toString(); + return super.toString(indent) + indent + "\ttopic:\t\t" + + topicExtractor + + "\n"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 473586492e6..ca8c7accdaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -78,13 +78,10 @@ public class StreamsMetadataState { } public String toString(final String indent) { - final StringBuilder builder = new StringBuilder(); - builder.append(indent).append("GlobalMetadata: ").append(allMetadata).append("\n"); - builder.append(indent).append("GlobalStores: ").append(globalStores).append("\n"); - builder.append(indent).append("My HostInfo: ").append(thisHost).append("\n"); - builder.append(indent).append("PartitionsByTopic: ").append(partitionsByTopic).append("\n"); - - return builder.toString(); + return indent + "GlobalMetadata: " + allMetadata + "\n" + + indent + "GlobalStores: " + globalStores + "\n" + + indent + "My HostInfo: " + thisHost + "\n" + + indent + "PartitionsByTopic: " + partitionsByTopic + "\n"; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index ad20af61bda..a6546c1edb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -73,7 +73,7 @@ public class MeteredSessionStore private InternalProcessorContext context; private TaskId taskId; - private LongAdder numOpenIterators = new LongAdder(); + private final LongAdder numOpenIterators = new LongAdder(); private final NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); @SuppressWarnings("rawtypes") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 05d423a985c..e59665fb2eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -76,8 +76,8 @@ public class MeteredWindowStore private InternalProcessorContext context; private TaskId taskId; - private LongAdder numOpenIterators = new LongAdder(); - private NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); + private final LongAdder numOpenIterators = new LongAdder(); + private final NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); @SuppressWarnings("rawtypes") private final Map queryHandlers = diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index b6711bc74c8..555786e81e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -312,7 +312,7 @@ public class IQv2IntegrationTest { public KeyValueStore get() { return new KeyValueStore() { private boolean open = false; - private Map map = new HashMap<>(); + private final Map map = new HashMap<>(); private Position position; private StateStoreContext context; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java index 09feadf2f98..ba941910b35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -175,7 +176,7 @@ public class JoinGracePeriodDurabilityIntegrationTest { // flush those recovered buffered events out. produceSynchronouslyToPartitionZero( streamInput, - asList( + Collections.singletonList( new KeyValueTimestamp<>("k6", "v6", scaledTime(20L)) ) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index 21f11847be9..a2b0750550e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -126,7 +126,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { new KeyValue<>("ID123-4", "ID123-A4") ); - final List> table2 = asList( + final List> table2 = Collections.singletonList( new KeyValue<>("ID123", "BBB") ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java index 1a583dd15f6..25c8723e867 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java @@ -135,7 +135,7 @@ public class SelfJoinUpgradeIntegrationTest { "1", "A", currentTime + 42L, - asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) + singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) ); processKeyValueAndVerifyCount( @@ -200,7 +200,7 @@ public class SelfJoinUpgradeIntegrationTest { "1", "A", currentTime + 42L, - asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) + singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) ); processKeyValueAndVerifyCount( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index ab2f66a1ddd..dc3d0bc6d4f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -232,7 +232,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "E-null", null, 16L)), null ); @@ -285,7 +285,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "E-null", null, 16L)), null ); @@ -340,9 +340,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "E-null", null, 16L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "null-e", null, 17L)) ); @@ -394,9 +394,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "E-null", null, 16L)), - Arrays.asList( + Collections.singletonList( new TestRecord<>(null, "null-e", null, 17L)) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 9ab5c7d574c..6a61d781f51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -104,7 +104,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { private String outputTopic2; private final StreamsBuilder builder = new StreamsBuilder(); private final List processorValueCollector = new ArrayList<>(); - private static AtomicBoolean throwError = new AtomicBoolean(true); + private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true); private Properties properties; @@ -226,10 +226,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { @Override public void process(final Record record) { valueList.add(record.value().toString()); - if (throwError.get()) { + if (THROW_ERROR.get()) { throw new StreamsException(Thread.currentThread().getName()); } - throwError.set(true); + THROW_ERROR.set(true); } } @@ -382,7 +382,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { final AtomicInteger count = new AtomicInteger(); kafkaStreams.setUncaughtExceptionHandler(exception -> { if (count.incrementAndGet() == numThreads) { - throwError.set(false); + THROW_ERROR.set(false); } return REPLACE_THREAD; }); @@ -390,7 +390,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { produceMessages(NOW, inputTopic, "A"); TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads"); - TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads"); + TestUtils.waitForCondition(THROW_ERROR::get, "finished replacing threads"); kafkaStreams.close(); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index 9f025ee414a..ab36eda4190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -78,7 +78,7 @@ public class TaskMetadataIntegrationTest { private String inputTopic; private static StreamsBuilder builder; private static Properties properties; - private static String appIdPrefix = "TaskMetadataTest_"; + private static final String APP_ID_PREFIX = "TaskMetadataTest_"; private static String appId; private AtomicBoolean process; private AtomicBoolean commit; @@ -86,7 +86,7 @@ public class TaskMetadataIntegrationTest { @BeforeEach public void setup(final TestInfo testInfo) { final String testId = safeUniqueTestName(testInfo); - appId = appIdPrefix + testId; + appId = APP_ID_PREFIX + testId; inputTopic = "input" + testId; IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java index 0bfb00cc7ca..d96b0f08b81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java @@ -70,7 +70,7 @@ public class PrintedTest { try (final InputStream stream = Files.newInputStream(file.toPath())) { final byte[] data = new byte[stream.available()]; stream.read(data); - assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n")); + assertThat(new String(data, StandardCharsets.UTF_8), equalTo("[processor]: hi, 1\n")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 773e342f0ea..755944227b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -1234,9 +1234,7 @@ public class InternalStreamsBuilderTest { return currentNode; } for (final GraphNode child: currentNode.children()) { - if (!visited.contains(child)) { - visited.add(child); - } + visited.add(child); final GraphNode result = getNodeByType(child, clazz, visited); if (result != null) { return result; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index c5767e62ff8..5a1c2fa1f70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -1351,7 +1351,7 @@ public class KStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final String input = "topic"; final KStream stream = builder.stream(input, stringConsumed); - stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1), + stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.charAt(0), Produced.with(Serdes.String(), Serdes.String())); builder.stream(input + "-a-v", stringConsumed).process(processorSupplier); builder.stream(input + "-b-v", stringConsumed).process(processorSupplier); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java index 9298a2dfb64..f78012070cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.function.Consumer; @@ -71,8 +72,8 @@ public class KStreamSplitTest { final TestOutputTopic x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer()); final TestOutputTopic x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer()); assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList()); - assertEquals(Arrays.asList("V3"), x3.readValuesToList()); - assertEquals(Arrays.asList("V5"), x5.readValuesToList()); + assertEquals(Collections.singletonList("V3"), x3.readValuesToList()); + assertEquals(Collections.singletonList("V5"), x5.readValuesToList()); }); } @@ -128,9 +129,9 @@ public class KStreamSplitTest { final TestOutputTopic x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer()); final TestOutputTopic defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer()); assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList()); - assertEquals(Arrays.asList("V-1"), negative.readValuesToList()); - assertEquals(Arrays.asList("V7"), x7.readValuesToList()); - assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList()); + assertEquals(Collections.singletonList("V-1"), negative.readValuesToList()); + assertEquals(Collections.singletonList("V7"), x7.readValuesToList()); + assertEquals(Collections.singletonList("V1"), defaultBranch.readValuesToList()); }); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java index b7be9f29638..3a9cad80aad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java @@ -52,7 +52,7 @@ public class TimestampedTupleForwarderTest { new TimestampedTupleForwarder<>( store, - (org.apache.kafka.streams.processor.api.ProcessorContext>) null, + null, flushListener, sendOldValues ); @@ -82,7 +82,7 @@ public class TimestampedTupleForwarderTest { final TimestampedTupleForwarder forwarder = new TimestampedTupleForwarder<>( store, - (org.apache.kafka.streams.processor.api.ProcessorContext>) context, + context, null, sendOldValues ); @@ -102,7 +102,7 @@ public class TimestampedTupleForwarderTest { final TimestampedTupleForwarder forwarder = new TimestampedTupleForwarder<>( store, - (org.apache.kafka.streams.processor.api.ProcessorContext>) context, + context, null, false ); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java index 2c78bb6c6d2..7864f41e183 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java @@ -23,7 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class UnlimitedWindowTest { - private long start = 50; + private final long start = 50; private final UnlimitedWindow window = new UnlimitedWindow(start); private final SessionWindow sessionWindow = new SessionWindow(start, start); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 5c83283b540..eb8f95f96f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -38,12 +38,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class WindowedStreamPartitionerTest { - private String topicName = "topic"; + private final String topicName = "topic"; - private IntegerSerializer intSerializer = new IntegerSerializer(); - private StringSerializer stringSerializer = new StringSerializer(); + private final IntegerSerializer intSerializer = new IntegerSerializer(); + private final StringSerializer stringSerializer = new StringSerializer(); - private List infos = Arrays.asList( + private final List infos = Arrays.asList( new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), @@ -52,7 +52,7 @@ public class WindowedStreamPartitionerTest { new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, + private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, Collections.emptySet(), Collections.emptySet()); @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 5da75676690..b0152da0be4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -63,7 +63,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -362,8 +361,8 @@ public class InternalTopicManagerTest { final InternalTopicManager internalTopicManager = new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config)); try { - final Set topic1set = new HashSet(Arrays.asList(topic1)); - final Set topic2set = new HashSet(Arrays.asList(topic2)); + final Set topic1set = new HashSet(Collections.singletonList(topic1)); + final Set topic2set = new HashSet(Collections.singletonList(topic2)); internalTopicManager.getNumPartitions(topic1set, topic2set); @@ -374,8 +373,8 @@ public class InternalTopicManagerTest { mockAdminClient.timeoutNextRequest(1); try { - final Set topic1set = new HashSet(Arrays.asList(topic1)); - final Set topic2set = new HashSet(Arrays.asList(topic2)); + final Set topic1set = new HashSet(Collections.singletonList(topic1)); + final Set topic2set = new HashSet(Collections.singletonList(topic2)); internalTopicManager.getNumPartitions(topic1set, topic2set); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java index 8d0f8c7a6b0..49d18d888ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -27,7 +27,7 @@ import java.util.Set; public class MockChangelogReader implements ChangelogReader { private final Set restoringPartitions = new HashSet<>(); - private Map restoredOffsets = Collections.emptyMap(); + private final Map restoredOffsets = Collections.emptyMap(); public boolean isPartitionRegistered(final TopicPartition partition) { return restoringPartitions.contains(partition); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 336832e02ab..a46c54ee05b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -1126,7 +1126,7 @@ public class ProcessorStateManagerTest { } public static class StateStorePositionCommit implements CommitCallback { - private File file; + private final File file; private final OffsetCheckpoint checkpointFile; private final Position position; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java index f65945fabbb..4bb8ca5b2da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -69,7 +69,7 @@ public class StateManagerUtilTest { @Mock private InternalProcessorContext processorContext; - private Logger logger = new LogContext("test").logger(AbstractTask.class); + private final Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index fd6d174cc09..7c7047b7516 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -373,19 +373,9 @@ public final class AssignmentTestUtils { .collect(entriesToMap(TreeMap::new)); if (!misassigned.isEmpty()) { - assertThat( - new StringBuilder().append("Found some over- or under-assigned tasks in the final assignment with ") - .append(numStandbyReplicas) - .append(" and max warmups ") - .append(maxWarmupReplicas) - .append(" standby replicas, stateful tasks:") - .append(statefulTasks) - .append(", and stateless tasks:") - .append(statelessTasks) - .append(failureContext) - .toString(), - misassigned, - is(emptyMap())); + assertThat("Found some over- or under-assigned tasks in the final assignment with " + numStandbyReplicas + + " and max warmups " + maxWarmupReplicas + " standby replicas, stateful tasks:" + statefulTasks + + ", and stateless tasks:" + statelessTasks + failureContext, misassigned, is(emptyMap())); } } @@ -396,29 +386,13 @@ public final class AssignmentTestUtils { final Map.Entry entry) { for (final TaskId standbyTask : entry.getValue().standbyTasks()) { if (statelessTasks.contains(standbyTask)) { - throw new AssertionError( - new StringBuilder().append("Found a standby task for stateless task ") - .append(standbyTask) - .append(" on client ") - .append(entry) - .append(" stateless tasks:") - .append(statelessTasks) - .append(failureContext) - .toString() - ); + throw new AssertionError("Found a standby task for stateless task " + standbyTask + " on client " + + entry + " stateless tasks:" + statelessTasks + failureContext); } else if (assignments.containsKey(standbyTask)) { assignments.get(standbyTask).add(entry.getKey()); } else { - throw new AssertionError( - new StringBuilder().append("Found an extra standby task ") - .append(standbyTask) - .append(" on client ") - .append(entry) - .append(" but expected stateful tasks:") - .append(statefulTasks) - .append(failureContext) - .toString() - ); + throw new AssertionError("Found an extra standby task " + standbyTask + " on client " + + entry + " but expected stateful tasks:" + statefulTasks + failureContext); } } } @@ -432,18 +406,7 @@ public final class AssignmentTestUtils { if (assignments.containsKey(activeTask)) { assignments.get(activeTask).add(entry.getKey()); } else { - throw new AssertionError( - new StringBuilder().append("Found an extra active task ") - .append(activeTask) - .append(" on client ") - .append(entry) - .append(" but expected stateful tasks:") - .append(statefulTasks) - .append(" and stateless tasks:") - .append(statelessTasks) - .append(failureContext) - .toString() - ); + throw new AssertionError("Found an extra active task " + activeTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + " and stateless tasks:" + statelessTasks + failureContext); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 0ff84e75946..936c9eb6a85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -573,13 +573,7 @@ public class TaskAssignorConvergenceTest { assertBalancedStatefulAssignment(allStatefulTasks, clientStates, failureContext); final AssignmentTestUtils.TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(harness.clientStates, skewThreshold); if (taskSkewReport.totalSkewedTasks() > 0) { - fail( - new StringBuilder().append("Expected a balanced task assignment, but was: ") - .append(taskSkewReport) - .append('\n') - .append(failureContext) - .toString() - ); + fail("Expected a balanced task assignment, but was: " + taskSkewReport + '\n' + failureContext); } } @@ -628,13 +622,7 @@ public class TaskAssignorConvergenceTest { } if (rebalancePending) { - final StringBuilder message = - new StringBuilder().append("Rebalances have not converged after iteration cutoff: ") - .append(iterationLimit) - .append(harness.history); - fail(message.toString()); + fail("Rebalances have not converged after iteration cutoff: " + iterationLimit + harness.history); } } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 8567cda824c..31bea36e8c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -482,7 +482,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.fetch( null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) ); @@ -492,7 +492,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) ); @@ -662,7 +662,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.backwardFetch( null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L) ); @@ -672,7 +672,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L) ); @@ -740,7 +740,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyA, maxWindow), 10L) ); @@ -823,7 +823,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) { - final List, Long>> expected = asList( + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyA, maxWindow), 10L) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 8377c611ddd..161f790c142 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -582,7 +582,7 @@ public abstract class AbstractSessionBytesStoreTest { try (final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); } try (final KeyValueIterator, Long> iterator = @@ -638,7 +638,7 @@ public abstract class AbstractSessionBytesStoreTest { try (final KeyValueIterator, Long> iterator = sessionStore.backwardFindSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); } try (final KeyValueIterator, Long> iterator = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 17c515f3825..29d43bcca55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -103,7 +103,7 @@ public class MeteredVersionedKeyValueStoreTest { private final Metrics metrics = new Metrics(); private final Time mockTime = new MockTime(); private final String threadId = Thread.currentThread().getName(); - private InternalProcessorContext context = mock(InternalProcessorContext.class); + private final InternalProcessorContext context = mock(InternalProcessorContext.class); private Map tags; private MeteredVersionedKeyValueStore store; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java index 624edbb8eb4..8d189144504 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java @@ -52,13 +52,13 @@ public class RocksDBBlockCacheMetricsTest { private static final String STORE_NAME = "test"; private static final String METRICS_SCOPE = "test-scope"; - private static TaskId taskId = new TaskId(0, 0); + private static final TaskId TASK_ID = new TaskId(0, 0); public static Stream stores() { final File stateDir = TestUtils.tempDirectory("state"); return Stream.of( - Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir)), - Arguments.of(new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir)) + Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, stateDir)), + Arguments.of(new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, stateDir)) ); } @@ -109,7 +109,7 @@ public class RocksDBBlockCacheMetricsTest { metricName, group, "Ignored", - storeLevelTagMap(taskId.toString(), METRICS_SCOPE, STORE_NAME) + storeLevelTagMap(TASK_ID.toString(), METRICS_SCOPE, STORE_NAME) ); final KafkaMetric metric = (KafkaMetric) metrics.metrics().get(name); assertEquals(expected, metric.metricValue(), String.format("Value for metric '%s-%s' was incorrect", group, metricName)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index a8058a1cd3a..f62d0c480fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -60,7 +60,7 @@ public class EosTestDriver extends SmokeTestUtil { private static final long MAX_IDLE_TIME_MS = 600000L; private static volatile boolean isRunning = true; - private static CountDownLatch terminated = new CountDownLatch(1); + private static final CountDownLatch TERMINATED = new CountDownLatch(1); private static int numRecordsProduced = 0; @@ -74,7 +74,7 @@ public class EosTestDriver extends SmokeTestUtil { isRunning = false; try { - if (terminated.await(5L, TimeUnit.MINUTES)) { + if (TERMINATED.await(5L, TimeUnit.MINUTES)) { System.out.println("Terminated"); } else { System.out.println("Terminated with timeout"); @@ -167,7 +167,7 @@ public class EosTestDriver extends SmokeTestUtil { } System.out.flush(); } finally { - terminated.countDown(); + TERMINATED.countDown(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index e4b96fe1053..f64619199fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -30,12 +30,12 @@ import java.util.Properties; public class StaticMemberTestClient { - private static String testName = "StaticMemberTestClient"; + private static final String TEST_NAME = "StaticMemberTestClient"; @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println(testName + " requires one argument (properties-file) but none provided: "); + System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: "); } System.out.println("StreamsTest instance started"); @@ -46,7 +46,7 @@ public class StaticMemberTestClient { final String groupInstanceId = Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); - System.out.println(testName + " instance started with group.instance.id " + groupInstanceId); + System.out.println(TEST_NAME + " instance started with group.instance.id " + groupInstanceId); System.out.println("props=" + streamsProperties); System.out.flush(); @@ -54,10 +54,10 @@ public class StaticMemberTestClient { final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic"))); final KStream dataStream = builder.stream(inputTopic); - dataStream.peek((k, v) -> System.out.println(String.format("PROCESSED key=%s value=%s", k, v))); + dataStream.peek((k, v) -> System.out.printf("PROCESSED key=%s value=%s%n", k, v)); final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, TEST_NAME); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 90c2bb94ece..c966b2de1fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -80,11 +80,11 @@ public class StreamsBrokerDownResilienceTest { } if (!confirmCorrectConfigs(streamsProperties)) { - System.err.println(String.format("ERROR: Did not have all required configs expected to contain %s %s %s %s", + System.err.printf("ERROR: Did not have all required configs expected to contain %s %s %s %s%n", StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), - StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG))); + StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)); Exit.exit(1); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index af3614c7326..f7be9430d66 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -66,7 +66,7 @@ public class StreamsNamedRepartitionTest { final StreamsBuilder builder = new StreamsBuilder(); final KStream sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); - sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v))); + sourceStream.peek((k, v) -> System.out.printf("input data key=%s, value=%s%n", k, v)); final KStream mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); @@ -81,7 +81,7 @@ public class StreamsNamedRepartitionTest { maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String())) .aggregate(initializer, aggregator, Materialized.>as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())) .toStream() - .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))) + .peek((k, v) -> System.out.printf("AGGREGATED key=%s value=%s%n", k, v)) .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer())); final Properties config = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index 95945b1b446..187a072071b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -90,20 +90,20 @@ public class StreamsOptimizedTest { aggregator, Materialized.with(Serdes.String(), Serdes.Integer())) .toStream() - .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))) + .peek((k, v) -> System.out.printf("AGGREGATED key=%s value=%s%n", k, v)) .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer())); mappedStream.groupByKey() .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())) .toStream() - .peek((k, v) -> System.out.println(String.format("REDUCED key=%s value=%s", k, v))) + .peek((k, v) -> System.out.printf("REDUCED key=%s value=%s%n", k, v)) .to(reduceTopic, Produced.with(Serdes.String(), Serdes.String())); mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.of(ofMillis(500)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long())) - .peek((k, v) -> System.out.println(String.format("JOINED key=%s value=%s", k, v))) + .peek((k, v) -> System.out.printf("JOINED key=%s value=%s%n", k, v)) .to(joinTopic, Produced.with(Serdes.String(), Serdes.String())); final Properties config = new Properties(); @@ -125,7 +125,7 @@ public class StreamsOptimizedTest { streams.setStateListener((newState, oldState) -> { if (oldState == State.REBALANCING && newState == State.RUNNING) { final int repartitionTopicCount = getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern); - System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", repartitionTopicCount)); + System.out.printf("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d%n", repartitionTopicCount); System.out.flush(); } }); @@ -149,7 +149,7 @@ public class StreamsOptimizedTest { final List repartitionTopicsFound = new ArrayList<>(); while (matcher.find()) { final String repartitionTopic = matcher.group(); - System.out.println(String.format("REPARTITION TOPIC found -> %s", repartitionTopic)); + System.out.printf("REPARTITION TOPIC found -> %s%n", repartitionTopic); repartitionTopicsFound.add(repartitionTopic); } return repartitionTopicsFound.size(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index 2568b498c97..d23915790b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -86,11 +86,11 @@ public class StreamsStandByReplicaTest { final String sinkTopic2 = updated.remove("sinkTopic2"); if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) { - System.err.println(String.format( - "one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", + System.err.printf( + "one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]%n", sourceTopic, sinkTopic1, - sinkTopic2)); + sinkTopic2); System.err.flush(); Exit.exit(1); } @@ -98,11 +98,13 @@ public class StreamsStandByReplicaTest { streamsProperties.putAll(updated); if (!confirmCorrectConfigs(streamsProperties)) { - System.err.println(String.format("ERROR: Did not have all required configs expected to contain %s, %s, %s, %s", - StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), - StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), - StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), - StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG))); + System.err.printf( + "ERROR: Did not have all required configs expected to contain %s, %s, %s, %s%n", + StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), + StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), + StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), + StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG) + ); Exit.exit(1); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 6d7da29e3ac..0a7bbe14f5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.metadataForLocalThreads(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 88f24e8d9bd..84ad3e3f800 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -43,7 +43,7 @@ public class MockClientSupplier implements KafkaClientSupplier { private String applicationId; public MockAdminClient adminClient = new MockAdminClient(); - private List> preparedProducers = new LinkedList<>(); + private final List> preparedProducers = new LinkedList<>(); public final List> producers = new LinkedList<>(); public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java index a1c9ac16445..a3c7194680d 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java @@ -48,7 +48,7 @@ public class MockInternalNewProcessorContext extends MockProcessorCo private ProcessorNode currentNode; private long currentSystemTimeMs; - private TaskType taskType = TaskType.ACTIVE; + private final TaskType taskType = TaskType.ACTIVE; private long timestamp = 0; private Headers headers = new RecordHeaders(); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 12508ee7c55..4d4ad0e4dc0 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -51,7 +51,7 @@ public class MockInternalProcessorContext extends org.apache.kafka.streams.proce private ProcessorNode currentNode; private RecordCollector recordCollector; private long currentSystemTimeMs; - private TaskType taskType = TaskType.ACTIVE; + private final TaskType taskType = TaskType.ACTIVE; private ProcessorMetadata processorMetadata; public MockInternalProcessorContext() { diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index 77678bc383e..0bc1457a029 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -39,7 +39,7 @@ public class MockRestoreConsumer extends MockConsumer { private long endOffset = 0L; private long currentOffset = 0L; - private ArrayList> recordBuffer = new ArrayList<>(); + private final ArrayList> recordBuffer = new ArrayList<>(); @SuppressWarnings("this-escape") public MockRestoreConsumer(final Serializer keySerializer, final Serializer valueSerializer) { diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index ee15b1dfbc4..1528b2c472b 100644 --- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -67,7 +67,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -82,7 +82,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 6b339b64f40..4efe70911ab 100644 --- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -66,7 +66,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 32ef2ebe50b..1cc115f3c06 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -62,7 +62,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -77,7 +77,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index a2ffa9d14a5..25685bb42ea 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -61,7 +61,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -76,7 +76,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index bda6ac45833..c66aae3b614 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.localThreadsMetadata(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); @@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 6643d29fad8..55e07f1e394 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.localThreadsMetadata(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); @@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 0c697f6b4cf..0b7fcc48207 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.localThreadsMetadata(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); @@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 299fffacaaf..b430607a096 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.localThreadsMetadata(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); @@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 299fffacaaf..b430607a096 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter)); + System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); System.out.flush(); } } @@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { streams.setStateListener((newState, oldState) -> { if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase)); + System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); final Set allThreadMetadata = streams.localThreadsMetadata(); final StringBuilder taskReportBuilder = new StringBuilder(); final List activeTasks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { } if (newState == State.REBALANCING) { - System.out.println(String.format("%sStarting a REBALANCE", upgradePhase)); + System.out.printf("%sStarting a REBALANCE%n", upgradePhase); } }); @@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { Runtime.getRuntime().addShutdownHook(new Thread(() -> { streams.close(); - System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); + System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); System.out.flush(); })); } diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 0a7a48fac1c..8dfdd954d48 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -68,7 +68,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest { @Override public void apply(final String key, final String value) { if (recordCounter++ % reportInterval == 0) { - System.out.println(String.format("Processed %d records so far", recordCounter)); + System.out.printf("Processed %d records so far%n", recordCounter); System.out.flush(); } }