mirror of https://github.com/apache/kafka.git
MINOR: Code cleanup Kafka Streams (#16050)
Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
0d44415bac
commit
8faeb9390d
|
@ -82,8 +82,7 @@ public abstract class InternalTopicConfig {
|
|||
|
||||
public void setNumberOfPartitions(final int numberOfPartitions) {
|
||||
if (hasEnforcedNumberOfPartitions()) {
|
||||
throw new UnsupportedOperationException("number of partitions are enforced on topic " +
|
||||
"" + name() + " and can't be altered.");
|
||||
throw new UnsupportedOperationException("number of partitions are enforced on topic " + name() + " and can't be altered.");
|
||||
}
|
||||
|
||||
validateNumberOfPartitions(numberOfPartitions);
|
||||
|
|
|
@ -119,11 +119,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
|
|||
*/
|
||||
@Override
|
||||
public String toString(final String indent) {
|
||||
final StringBuilder sb = new StringBuilder(super.toString(indent));
|
||||
sb.append(indent).append("\ttopic:\t\t");
|
||||
sb.append(topicExtractor);
|
||||
sb.append("\n");
|
||||
return sb.toString();
|
||||
return super.toString(indent) + indent + "\ttopic:\t\t" +
|
||||
topicExtractor +
|
||||
"\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -78,13 +78,10 @@ public class StreamsMetadataState {
|
|||
}
|
||||
|
||||
public String toString(final String indent) {
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
builder.append(indent).append("GlobalMetadata: ").append(allMetadata).append("\n");
|
||||
builder.append(indent).append("GlobalStores: ").append(globalStores).append("\n");
|
||||
builder.append(indent).append("My HostInfo: ").append(thisHost).append("\n");
|
||||
builder.append(indent).append("PartitionsByTopic: ").append(partitionsByTopic).append("\n");
|
||||
|
||||
return builder.toString();
|
||||
return indent + "GlobalMetadata: " + allMetadata + "\n" +
|
||||
indent + "GlobalStores: " + globalStores + "\n" +
|
||||
indent + "My HostInfo: " + thisHost + "\n" +
|
||||
indent + "PartitionsByTopic: " + partitionsByTopic + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
|
|||
private InternalProcessorContext<?, ?> context;
|
||||
private TaskId taskId;
|
||||
|
||||
private LongAdder numOpenIterators = new LongAdder();
|
||||
private final LongAdder numOpenIterators = new LongAdder();
|
||||
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
@ -76,8 +76,8 @@ public class MeteredWindowStore<K, V>
|
|||
private InternalProcessorContext<?, ?> context;
|
||||
private TaskId taskId;
|
||||
|
||||
private LongAdder numOpenIterators = new LongAdder();
|
||||
private NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
|
||||
private final LongAdder numOpenIterators = new LongAdder();
|
||||
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final Map<Class, QueryHandler> queryHandlers =
|
||||
|
|
|
@ -312,7 +312,7 @@ public class IQv2IntegrationTest {
|
|||
public KeyValueStore<Bytes, byte[]> get() {
|
||||
return new KeyValueStore<Bytes, byte[]>() {
|
||||
private boolean open = false;
|
||||
private Map<Bytes, byte[]> map = new HashMap<>();
|
||||
private final Map<Bytes, byte[]> map = new HashMap<>();
|
||||
private Position position;
|
||||
private StateStoreContext context;
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.junit.jupiter.api.Timeout;
|
|||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -175,7 +176,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
|
|||
// flush those recovered buffered events out.
|
||||
produceSynchronouslyToPartitionZero(
|
||||
streamInput,
|
||||
asList(
|
||||
Collections.singletonList(
|
||||
new KeyValueTimestamp<>("k6", "v6", scaledTime(20L))
|
||||
)
|
||||
);
|
||||
|
|
|
@ -126,7 +126,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
|
|||
new KeyValue<>("ID123-4", "ID123-A4")
|
||||
);
|
||||
|
||||
final List<KeyValue<String, String>> table2 = asList(
|
||||
final List<KeyValue<String, String>> table2 = Collections.singletonList(
|
||||
new KeyValue<>("ID123", "BBB")
|
||||
);
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ public class SelfJoinUpgradeIntegrationTest {
|
|||
"1",
|
||||
"A",
|
||||
currentTime + 42L,
|
||||
asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
|
||||
singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
|
||||
);
|
||||
|
||||
processKeyValueAndVerifyCount(
|
||||
|
@ -200,7 +200,7 @@ public class SelfJoinUpgradeIntegrationTest {
|
|||
"1",
|
||||
"A",
|
||||
currentTime + 42L,
|
||||
asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
|
||||
singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
|
||||
);
|
||||
|
||||
processKeyValueAndVerifyCount(
|
||||
|
|
|
@ -232,7 +232,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
|
|||
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "E-null", null, 16L)),
|
||||
null
|
||||
);
|
||||
|
@ -285,7 +285,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
|
|||
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "E-null", null, 16L)),
|
||||
null
|
||||
);
|
||||
|
@ -340,9 +340,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
|
|||
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "E-null", null, 16L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "null-e", null, 17L))
|
||||
);
|
||||
|
||||
|
@ -394,9 +394,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
|
|||
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
|
||||
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "E-null", null, 16L)),
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new TestRecord<>(null, "null-e", null, 17L))
|
||||
);
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
private String outputTopic2;
|
||||
private final StreamsBuilder builder = new StreamsBuilder();
|
||||
private final List<String> processorValueCollector = new ArrayList<>();
|
||||
private static AtomicBoolean throwError = new AtomicBoolean(true);
|
||||
private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
|
||||
|
||||
private Properties properties;
|
||||
|
||||
|
@ -226,10 +226,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
@Override
|
||||
public void process(final Record<KIn, VIn> record) {
|
||||
valueList.add(record.value().toString());
|
||||
if (throwError.get()) {
|
||||
if (THROW_ERROR.get()) {
|
||||
throw new StreamsException(Thread.currentThread().getName());
|
||||
}
|
||||
throwError.set(true);
|
||||
THROW_ERROR.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -382,7 +382,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
final AtomicInteger count = new AtomicInteger();
|
||||
kafkaStreams.setUncaughtExceptionHandler(exception -> {
|
||||
if (count.incrementAndGet() == numThreads) {
|
||||
throwError.set(false);
|
||||
THROW_ERROR.set(false);
|
||||
}
|
||||
return REPLACE_THREAD;
|
||||
});
|
||||
|
@ -390,7 +390,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
|
||||
produceMessages(NOW, inputTopic, "A");
|
||||
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
|
||||
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
|
||||
TestUtils.waitForCondition(THROW_ERROR::get, "finished replacing threads");
|
||||
kafkaStreams.close();
|
||||
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ public class TaskMetadataIntegrationTest {
|
|||
private String inputTopic;
|
||||
private static StreamsBuilder builder;
|
||||
private static Properties properties;
|
||||
private static String appIdPrefix = "TaskMetadataTest_";
|
||||
private static final String APP_ID_PREFIX = "TaskMetadataTest_";
|
||||
private static String appId;
|
||||
private AtomicBoolean process;
|
||||
private AtomicBoolean commit;
|
||||
|
@ -86,7 +86,7 @@ public class TaskMetadataIntegrationTest {
|
|||
@BeforeEach
|
||||
public void setup(final TestInfo testInfo) {
|
||||
final String testId = safeUniqueTestName(testInfo);
|
||||
appId = appIdPrefix + testId;
|
||||
appId = APP_ID_PREFIX + testId;
|
||||
inputTopic = "input" + testId;
|
||||
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ public class PrintedTest {
|
|||
try (final InputStream stream = Files.newInputStream(file.toPath())) {
|
||||
final byte[] data = new byte[stream.available()];
|
||||
stream.read(data);
|
||||
assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n"));
|
||||
assertThat(new String(data, StandardCharsets.UTF_8), equalTo("[processor]: hi, 1\n"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1234,9 +1234,7 @@ public class InternalStreamsBuilderTest {
|
|||
return currentNode;
|
||||
}
|
||||
for (final GraphNode child: currentNode.children()) {
|
||||
if (!visited.contains(child)) {
|
||||
visited.add(child);
|
||||
}
|
||||
visited.add(child);
|
||||
final GraphNode result = getNodeByType(child, clazz, visited);
|
||||
if (result != null) {
|
||||
return result;
|
||||
|
|
|
@ -1351,7 +1351,7 @@ public class KStreamImplTest {
|
|||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
final String input = "topic";
|
||||
final KStream<String, String> stream = builder.stream(input, stringConsumed);
|
||||
stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1),
|
||||
stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.charAt(0),
|
||||
Produced.with(Serdes.String(), Serdes.String()));
|
||||
builder.stream(input + "-a-v", stringConsumed).process(processorSupplier);
|
||||
builder.stream(input + "-b-v", stringConsumed).process(processorSupplier);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.test.StreamsTestUtils;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -71,8 +72,8 @@ public class KStreamSplitTest {
|
|||
final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
|
||||
final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
|
||||
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
|
||||
assertEquals(Arrays.asList("V3"), x3.readValuesToList());
|
||||
assertEquals(Arrays.asList("V5"), x5.readValuesToList());
|
||||
assertEquals(Collections.singletonList("V3"), x3.readValuesToList());
|
||||
assertEquals(Collections.singletonList("V5"), x5.readValuesToList());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -128,9 +129,9 @@ public class KStreamSplitTest {
|
|||
final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
|
||||
final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
|
||||
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
|
||||
assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
|
||||
assertEquals(Arrays.asList("V7"), x7.readValuesToList());
|
||||
assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
|
||||
assertEquals(Collections.singletonList("V-1"), negative.readValuesToList());
|
||||
assertEquals(Collections.singletonList("V7"), x7.readValuesToList());
|
||||
assertEquals(Collections.singletonList("V1"), defaultBranch.readValuesToList());
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ public class TimestampedTupleForwarderTest {
|
|||
|
||||
new TimestampedTupleForwarder<>(
|
||||
store,
|
||||
(org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null,
|
||||
null,
|
||||
flushListener,
|
||||
sendOldValues
|
||||
);
|
||||
|
@ -82,7 +82,7 @@ public class TimestampedTupleForwarderTest {
|
|||
final TimestampedTupleForwarder<String, String> forwarder =
|
||||
new TimestampedTupleForwarder<>(
|
||||
store,
|
||||
(org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
|
||||
context,
|
||||
null,
|
||||
sendOldValues
|
||||
);
|
||||
|
@ -102,7 +102,7 @@ public class TimestampedTupleForwarderTest {
|
|||
final TimestampedTupleForwarder<String, String> forwarder =
|
||||
new TimestampedTupleForwarder<>(
|
||||
store,
|
||||
(org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
|
||||
context,
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
|
|
@ -23,7 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
public class UnlimitedWindowTest {
|
||||
|
||||
private long start = 50;
|
||||
private final long start = 50;
|
||||
private final UnlimitedWindow window = new UnlimitedWindow(start);
|
||||
private final SessionWindow sessionWindow = new SessionWindow(start, start);
|
||||
|
||||
|
|
|
@ -38,12 +38,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
public class WindowedStreamPartitionerTest {
|
||||
|
||||
private String topicName = "topic";
|
||||
private final String topicName = "topic";
|
||||
|
||||
private IntegerSerializer intSerializer = new IntegerSerializer();
|
||||
private StringSerializer stringSerializer = new StringSerializer();
|
||||
private final IntegerSerializer intSerializer = new IntegerSerializer();
|
||||
private final StringSerializer stringSerializer = new StringSerializer();
|
||||
|
||||
private List<PartitionInfo> infos = Arrays.asList(
|
||||
private final List<PartitionInfo> infos = Arrays.asList(
|
||||
new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
|
||||
|
@ -52,7 +52,7 @@ public class WindowedStreamPartitionerTest {
|
|||
new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
|
||||
);
|
||||
|
||||
private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
|
||||
private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
|
||||
Collections.emptySet(), Collections.emptySet());
|
||||
|
||||
@Test
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.mockito.junit.jupiter.MockitoSettings;
|
|||
import org.mockito.quality.Strictness;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -362,8 +361,8 @@ public class InternalTopicManagerTest {
|
|||
final InternalTopicManager internalTopicManager =
|
||||
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
|
||||
try {
|
||||
final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
|
||||
final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
|
||||
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
|
||||
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));
|
||||
|
||||
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
||||
|
||||
|
@ -374,8 +373,8 @@ public class InternalTopicManagerTest {
|
|||
mockAdminClient.timeoutNextRequest(1);
|
||||
|
||||
try {
|
||||
final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
|
||||
final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
|
||||
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
|
||||
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));
|
||||
|
||||
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.Set;
|
|||
|
||||
public class MockChangelogReader implements ChangelogReader {
|
||||
private final Set<TopicPartition> restoringPartitions = new HashSet<>();
|
||||
private Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap();
|
||||
private final Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap();
|
||||
|
||||
public boolean isPartitionRegistered(final TopicPartition partition) {
|
||||
return restoringPartitions.contains(partition);
|
||||
|
|
|
@ -1126,7 +1126,7 @@ public class ProcessorStateManagerTest {
|
|||
}
|
||||
|
||||
public static class StateStorePositionCommit implements CommitCallback {
|
||||
private File file;
|
||||
private final File file;
|
||||
private final OffsetCheckpoint checkpointFile;
|
||||
private final Position position;
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ public class StateManagerUtilTest {
|
|||
@Mock
|
||||
private InternalProcessorContext processorContext;
|
||||
|
||||
private Logger logger = new LogContext("test").logger(AbstractTask.class);
|
||||
private final Logger logger = new LogContext("test").logger(AbstractTask.class);
|
||||
|
||||
private final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
|
|
|
@ -373,19 +373,9 @@ public final class AssignmentTestUtils {
|
|||
.collect(entriesToMap(TreeMap::new));
|
||||
|
||||
if (!misassigned.isEmpty()) {
|
||||
assertThat(
|
||||
new StringBuilder().append("Found some over- or under-assigned tasks in the final assignment with ")
|
||||
.append(numStandbyReplicas)
|
||||
.append(" and max warmups ")
|
||||
.append(maxWarmupReplicas)
|
||||
.append(" standby replicas, stateful tasks:")
|
||||
.append(statefulTasks)
|
||||
.append(", and stateless tasks:")
|
||||
.append(statelessTasks)
|
||||
.append(failureContext)
|
||||
.toString(),
|
||||
misassigned,
|
||||
is(emptyMap()));
|
||||
assertThat("Found some over- or under-assigned tasks in the final assignment with " + numStandbyReplicas +
|
||||
" and max warmups " + maxWarmupReplicas + " standby replicas, stateful tasks:" + statefulTasks +
|
||||
", and stateless tasks:" + statelessTasks + failureContext, misassigned, is(emptyMap()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -396,29 +386,13 @@ public final class AssignmentTestUtils {
|
|||
final Map.Entry<ProcessId, ClientState> entry) {
|
||||
for (final TaskId standbyTask : entry.getValue().standbyTasks()) {
|
||||
if (statelessTasks.contains(standbyTask)) {
|
||||
throw new AssertionError(
|
||||
new StringBuilder().append("Found a standby task for stateless task ")
|
||||
.append(standbyTask)
|
||||
.append(" on client ")
|
||||
.append(entry)
|
||||
.append(" stateless tasks:")
|
||||
.append(statelessTasks)
|
||||
.append(failureContext)
|
||||
.toString()
|
||||
);
|
||||
throw new AssertionError("Found a standby task for stateless task " + standbyTask + " on client " +
|
||||
entry + " stateless tasks:" + statelessTasks + failureContext);
|
||||
} else if (assignments.containsKey(standbyTask)) {
|
||||
assignments.get(standbyTask).add(entry.getKey());
|
||||
} else {
|
||||
throw new AssertionError(
|
||||
new StringBuilder().append("Found an extra standby task ")
|
||||
.append(standbyTask)
|
||||
.append(" on client ")
|
||||
.append(entry)
|
||||
.append(" but expected stateful tasks:")
|
||||
.append(statefulTasks)
|
||||
.append(failureContext)
|
||||
.toString()
|
||||
);
|
||||
throw new AssertionError("Found an extra standby task " + standbyTask + " on client " +
|
||||
entry + " but expected stateful tasks:" + statefulTasks + failureContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -432,18 +406,7 @@ public final class AssignmentTestUtils {
|
|||
if (assignments.containsKey(activeTask)) {
|
||||
assignments.get(activeTask).add(entry.getKey());
|
||||
} else {
|
||||
throw new AssertionError(
|
||||
new StringBuilder().append("Found an extra active task ")
|
||||
.append(activeTask)
|
||||
.append(" on client ")
|
||||
.append(entry)
|
||||
.append(" but expected stateful tasks:")
|
||||
.append(statefulTasks)
|
||||
.append(" and stateless tasks:")
|
||||
.append(statelessTasks)
|
||||
.append(failureContext)
|
||||
.toString()
|
||||
);
|
||||
throw new AssertionError("Found an extra active task " + activeTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + " and stateless tasks:" + statelessTasks + failureContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -573,13 +573,7 @@ public class TaskAssignorConvergenceTest {
|
|||
assertBalancedStatefulAssignment(allStatefulTasks, clientStates, failureContext);
|
||||
final AssignmentTestUtils.TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(harness.clientStates, skewThreshold);
|
||||
if (taskSkewReport.totalSkewedTasks() > 0) {
|
||||
fail(
|
||||
new StringBuilder().append("Expected a balanced task assignment, but was: ")
|
||||
.append(taskSkewReport)
|
||||
.append('\n')
|
||||
.append(failureContext)
|
||||
.toString()
|
||||
);
|
||||
fail("Expected a balanced task assignment, but was: " + taskSkewReport + '\n' + failureContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -628,13 +622,7 @@ public class TaskAssignorConvergenceTest {
|
|||
}
|
||||
|
||||
if (rebalancePending) {
|
||||
final StringBuilder message =
|
||||
new StringBuilder().append("Rebalances have not converged after iteration cutoff: ")
|
||||
.append(iterationLimit)
|
||||
.append(harness.history);
|
||||
fail(message.toString());
|
||||
fail("Rebalances have not converged after iteration cutoff: " + iterationLimit + harness.history);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -482,7 +482,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
|
||||
null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L)
|
||||
);
|
||||
|
||||
|
@ -492,7 +492,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
|
||||
Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L)
|
||||
);
|
||||
|
||||
|
@ -662,7 +662,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
|
||||
null, Bytes.wrap(keyA.getBytes()), startEdgeTime, endEdgeTime - 1L)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyA, startEdgeWindow), 10L)
|
||||
);
|
||||
|
||||
|
@ -672,7 +672,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
|
||||
Bytes.wrap(keyB.getBytes()), null, startEdgeTime + 1, endEdgeTime)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyB, endEdgeWindow), 150L)
|
||||
);
|
||||
|
||||
|
@ -740,7 +740,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
|
||||
Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyA, maxWindow), 10L)
|
||||
);
|
||||
|
||||
|
@ -823,7 +823,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
|
|||
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
|
||||
Bytes.wrap(keyA.getBytes()), 0, Long.MAX_VALUE)) {
|
||||
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = asList(
|
||||
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
|
||||
KeyValue.pair(new Windowed<>(keyA, maxWindow), 10L)
|
||||
);
|
||||
|
||||
|
|
|
@ -582,7 +582,7 @@ public abstract class AbstractSessionBytesStoreTest {
|
|||
try (final KeyValueIterator<Windowed<String>, Long> iterator =
|
||||
sessionStore.findSessions("a", "aa", 10, 0)
|
||||
) {
|
||||
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
|
||||
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
|
||||
}
|
||||
|
||||
try (final KeyValueIterator<Windowed<String>, Long> iterator =
|
||||
|
@ -638,7 +638,7 @@ public abstract class AbstractSessionBytesStoreTest {
|
|||
try (final KeyValueIterator<Windowed<String>, Long> iterator =
|
||||
sessionStore.backwardFindSessions("a", "aa", 10, 0)
|
||||
) {
|
||||
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
|
||||
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
|
||||
}
|
||||
|
||||
try (final KeyValueIterator<Windowed<String>, Long> iterator =
|
||||
|
|
|
@ -103,7 +103,7 @@ public class MeteredVersionedKeyValueStoreTest {
|
|||
private final Metrics metrics = new Metrics();
|
||||
private final Time mockTime = new MockTime();
|
||||
private final String threadId = Thread.currentThread().getName();
|
||||
private InternalProcessorContext context = mock(InternalProcessorContext.class);
|
||||
private final InternalProcessorContext context = mock(InternalProcessorContext.class);
|
||||
private Map<String, String> tags;
|
||||
|
||||
private MeteredVersionedKeyValueStore<String, String> store;
|
||||
|
|
|
@ -52,13 +52,13 @@ public class RocksDBBlockCacheMetricsTest {
|
|||
private static final String STORE_NAME = "test";
|
||||
private static final String METRICS_SCOPE = "test-scope";
|
||||
|
||||
private static TaskId taskId = new TaskId(0, 0);
|
||||
private static final TaskId TASK_ID = new TaskId(0, 0);
|
||||
|
||||
public static Stream<Arguments> stores() {
|
||||
final File stateDir = TestUtils.tempDirectory("state");
|
||||
return Stream.of(
|
||||
Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir)),
|
||||
Arguments.of(new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir))
|
||||
Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, stateDir)),
|
||||
Arguments.of(new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, stateDir))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ public class RocksDBBlockCacheMetricsTest {
|
|||
metricName,
|
||||
group,
|
||||
"Ignored",
|
||||
storeLevelTagMap(taskId.toString(), METRICS_SCOPE, STORE_NAME)
|
||||
storeLevelTagMap(TASK_ID.toString(), METRICS_SCOPE, STORE_NAME)
|
||||
);
|
||||
final KafkaMetric metric = (KafkaMetric) metrics.metrics().get(name);
|
||||
assertEquals(expected, metric.metricValue(), String.format("Value for metric '%s-%s' was incorrect", group, metricName));
|
||||
|
|
|
@ -60,7 +60,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
private static final long MAX_IDLE_TIME_MS = 600000L;
|
||||
|
||||
private static volatile boolean isRunning = true;
|
||||
private static CountDownLatch terminated = new CountDownLatch(1);
|
||||
private static final CountDownLatch TERMINATED = new CountDownLatch(1);
|
||||
|
||||
private static int numRecordsProduced = 0;
|
||||
|
||||
|
@ -74,7 +74,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
isRunning = false;
|
||||
|
||||
try {
|
||||
if (terminated.await(5L, TimeUnit.MINUTES)) {
|
||||
if (TERMINATED.await(5L, TimeUnit.MINUTES)) {
|
||||
System.out.println("Terminated");
|
||||
} else {
|
||||
System.out.println("Terminated with timeout");
|
||||
|
@ -167,7 +167,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
System.out.flush();
|
||||
} finally {
|
||||
terminated.countDown();
|
||||
TERMINATED.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,12 +30,12 @@ import java.util.Properties;
|
|||
|
||||
public class StaticMemberTestClient {
|
||||
|
||||
private static String testName = "StaticMemberTestClient";
|
||||
private static final String TEST_NAME = "StaticMemberTestClient";
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void main(final String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
System.err.println(testName + " requires one argument (properties-file) but none provided: ");
|
||||
System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: ");
|
||||
}
|
||||
|
||||
System.out.println("StreamsTest instance started");
|
||||
|
@ -46,7 +46,7 @@ public class StaticMemberTestClient {
|
|||
|
||||
final String groupInstanceId = Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
|
||||
|
||||
System.out.println(testName + " instance started with group.instance.id " + groupInstanceId);
|
||||
System.out.println(TEST_NAME + " instance started with group.instance.id " + groupInstanceId);
|
||||
System.out.println("props=" + streamsProperties);
|
||||
System.out.flush();
|
||||
|
||||
|
@ -54,10 +54,10 @@ public class StaticMemberTestClient {
|
|||
final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
|
||||
|
||||
final KStream<String, String> dataStream = builder.stream(inputTopic);
|
||||
dataStream.peek((k, v) -> System.out.println(String.format("PROCESSED key=%s value=%s", k, v)));
|
||||
dataStream.peek((k, v) -> System.out.printf("PROCESSED key=%s value=%s%n", k, v));
|
||||
|
||||
final Properties config = new Properties();
|
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName);
|
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, TEST_NAME);
|
||||
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
|
||||
|
|
|
@ -80,11 +80,11 @@ public class StreamsBrokerDownResilienceTest {
|
|||
}
|
||||
|
||||
if (!confirmCorrectConfigs(streamsProperties)) {
|
||||
System.err.println(String.format("ERROR: Did not have all required configs expected to contain %s %s %s %s",
|
||||
System.err.printf("ERROR: Did not have all required configs expected to contain %s %s %s %s%n",
|
||||
StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
|
||||
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG));
|
||||
|
||||
Exit.exit(1);
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ public class StreamsNamedRepartitionTest {
|
|||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
|
||||
sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s, value=%s", k, v)));
|
||||
sourceStream.peek((k, v) -> System.out.printf("input data key=%s, value=%s%n", k, v));
|
||||
|
||||
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
|
||||
|
||||
|
@ -81,7 +81,7 @@ public class StreamsNamedRepartitionTest {
|
|||
maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
|
||||
.aggregate(initializer, aggregator, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()))
|
||||
.toStream()
|
||||
.peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
|
||||
.peek((k, v) -> System.out.printf("AGGREGATED key=%s value=%s%n", k, v))
|
||||
.to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
|
||||
|
||||
final Properties config = new Properties();
|
||||
|
|
|
@ -90,20 +90,20 @@ public class StreamsOptimizedTest {
|
|||
aggregator,
|
||||
Materialized.with(Serdes.String(), Serdes.Integer()))
|
||||
.toStream()
|
||||
.peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
|
||||
.peek((k, v) -> System.out.printf("AGGREGATED key=%s value=%s%n", k, v))
|
||||
.to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
|
||||
|
||||
|
||||
mappedStream.groupByKey()
|
||||
.reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
|
||||
.toStream()
|
||||
.peek((k, v) -> System.out.println(String.format("REDUCED key=%s value=%s", k, v)))
|
||||
.peek((k, v) -> System.out.printf("REDUCED key=%s value=%s%n", k, v))
|
||||
.to(reduceTopic, Produced.with(Serdes.String(), Serdes.String()));
|
||||
|
||||
mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
|
||||
JoinWindows.of(ofMillis(500)),
|
||||
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
|
||||
.peek((k, v) -> System.out.println(String.format("JOINED key=%s value=%s", k, v)))
|
||||
.peek((k, v) -> System.out.printf("JOINED key=%s value=%s%n", k, v))
|
||||
.to(joinTopic, Produced.with(Serdes.String(), Serdes.String()));
|
||||
|
||||
final Properties config = new Properties();
|
||||
|
@ -125,7 +125,7 @@ public class StreamsOptimizedTest {
|
|||
streams.setStateListener((newState, oldState) -> {
|
||||
if (oldState == State.REBALANCING && newState == State.RUNNING) {
|
||||
final int repartitionTopicCount = getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern);
|
||||
System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", repartitionTopicCount));
|
||||
System.out.printf("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d%n", repartitionTopicCount);
|
||||
System.out.flush();
|
||||
}
|
||||
});
|
||||
|
@ -149,7 +149,7 @@ public class StreamsOptimizedTest {
|
|||
final List<String> repartitionTopicsFound = new ArrayList<>();
|
||||
while (matcher.find()) {
|
||||
final String repartitionTopic = matcher.group();
|
||||
System.out.println(String.format("REPARTITION TOPIC found -> %s", repartitionTopic));
|
||||
System.out.printf("REPARTITION TOPIC found -> %s%n", repartitionTopic);
|
||||
repartitionTopicsFound.add(repartitionTopic);
|
||||
}
|
||||
return repartitionTopicsFound.size();
|
||||
|
|
|
@ -86,11 +86,11 @@ public class StreamsStandByReplicaTest {
|
|||
final String sinkTopic2 = updated.remove("sinkTopic2");
|
||||
|
||||
if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) {
|
||||
System.err.println(String.format(
|
||||
"one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]",
|
||||
System.err.printf(
|
||||
"one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]%n",
|
||||
sourceTopic,
|
||||
sinkTopic1,
|
||||
sinkTopic2));
|
||||
sinkTopic2);
|
||||
System.err.flush();
|
||||
Exit.exit(1);
|
||||
}
|
||||
|
@ -98,11 +98,13 @@ public class StreamsStandByReplicaTest {
|
|||
streamsProperties.putAll(updated);
|
||||
|
||||
if (!confirmCorrectConfigs(streamsProperties)) {
|
||||
System.err.println(String.format("ERROR: Did not have all required configs expected to contain %s, %s, %s, %s",
|
||||
StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
|
||||
System.err.printf(
|
||||
"ERROR: Did not have all required configs expected to contain %s, %s, %s, %s%n",
|
||||
StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
|
||||
StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)
|
||||
);
|
||||
|
||||
Exit.exit(1);
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.metadataForLocalThreads();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ public class MockClientSupplier implements KafkaClientSupplier {
|
|||
private String applicationId;
|
||||
|
||||
public MockAdminClient adminClient = new MockAdminClient();
|
||||
private List<MockProducer<byte[], byte[]>> preparedProducers = new LinkedList<>();
|
||||
private final List<MockProducer<byte[], byte[]>> preparedProducers = new LinkedList<>();
|
||||
public final List<MockProducer<byte[], byte[]>> producers = new LinkedList<>();
|
||||
public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
|
||||
public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
|
||||
|
|
|
@ -48,7 +48,7 @@ public class MockInternalNewProcessorContext<KOut, VOut> extends MockProcessorCo
|
|||
|
||||
private ProcessorNode currentNode;
|
||||
private long currentSystemTimeMs;
|
||||
private TaskType taskType = TaskType.ACTIVE;
|
||||
private final TaskType taskType = TaskType.ACTIVE;
|
||||
|
||||
private long timestamp = 0;
|
||||
private Headers headers = new RecordHeaders();
|
||||
|
|
|
@ -51,7 +51,7 @@ public class MockInternalProcessorContext extends org.apache.kafka.streams.proce
|
|||
private ProcessorNode currentNode;
|
||||
private RecordCollector recordCollector;
|
||||
private long currentSystemTimeMs;
|
||||
private TaskType taskType = TaskType.ACTIVE;
|
||||
private final TaskType taskType = TaskType.ACTIVE;
|
||||
private ProcessorMetadata processorMetadata;
|
||||
|
||||
public MockInternalProcessorContext() {
|
||||
|
|
|
@ -39,7 +39,7 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
|
|||
private long endOffset = 0L;
|
||||
private long currentOffset = 0L;
|
||||
|
||||
private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
|
||||
private final ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
|
||||
|
||||
@SuppressWarnings("this-escape")
|
||||
public MockRestoreConsumer(final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
|
||||
|
|
|
@ -67,7 +67,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
|
||||
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
|
@ -101,7 +101,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
}
|
||||
|
||||
if (newState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
|
||||
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
@Override
|
||||
public void apply(final String key, final String value) {
|
||||
if (recordCounter++ % reportInterval == 0) {
|
||||
System.out.println(String.format("Processed %d records so far", recordCounter));
|
||||
System.out.printf("Processed %d records so far%n", recordCounter);
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue