MINOR: Disallow unused local variables (#18963)

Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.

A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.

Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-03-10 09:37:35 +01:00 committed by GitHub
parent 832dfa36da
commit fc2e3dfce9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 37 additions and 95 deletions

View File

@ -43,6 +43,7 @@
<property name="tokens" value="IDENT, NUM_DOUBLE, LAMBDA, TEXT_BLOCK_LITERAL_BEGIN, UNARY_MINUS, UNARY_PLUS, INC, DEC, POST_INC, POST_DEC" /> <property name="tokens" value="IDENT, NUM_DOUBLE, LAMBDA, TEXT_BLOCK_LITERAL_BEGIN, UNARY_MINUS, UNARY_PLUS, INC, DEC, POST_INC, POST_DEC" />
</module> </module>
<module name="SimplifyBooleanReturn"/> <module name="SimplifyBooleanReturn"/>
<module name="UnusedLocalVariable"/>
<!-- style --> <!-- style -->
<module name="DefaultComesLast"/> <module name="DefaultComesLast"/>

View File

@ -68,11 +68,12 @@ public class DescribeUserScramCredentialsRequest extends AbstractRequest {
.setThrottleTimeMs(throttleTimeMs) .setThrottleTimeMs(throttleTimeMs)
.setErrorCode(apiError.error().code()) .setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()); .setErrorMessage(apiError.message());
for (DescribeUserScramCredentialsRequestData.UserName user : data.users()) {
data.users().forEach(__ ->
response.results().add(new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() response.results().add(new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult()
.setErrorCode(apiError.error().code()) .setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())); .setErrorMessage(apiError.message()))
} );
return new DescribeUserScramCredentialsResponse(response); return new DescribeUserScramCredentialsResponse(response);
} }
} }

View File

@ -444,7 +444,7 @@ public class ConsumerHeartbeatRequestManagerTest {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size()); assertEquals(1, result.unsentRequests.size());
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED); createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new AuthenticationException("Fatal error in HB")); result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new AuthenticationException("Fatal error in HB"));
// The error should be propagated before notifying the group manager. This ensures that the app thread is aware // The error should be propagated before notifying the group manager. This ensures that the app thread is aware

View File

@ -2881,11 +2881,11 @@ public class FetcherTest {
field.setAccessible(true); field.setAccessible(true);
LinkedHashMap<?, ?> sessionPartitions = LinkedHashMap<?, ?> sessionPartitions =
(LinkedHashMap<?, ?>) field.get(handler); (LinkedHashMap<?, ?>) field.get(handler);
for (Map.Entry<?, ?> entry : sessionPartitions.entrySet()) {
// If `sessionPartitions` are modified on another thread, Thread.yield will increase the // If `sessionPartitions` are modified on another thread, Thread.yield will increase the
// possibility of ConcurrentModificationException if appropriate synchronization is not used. // possibility of ConcurrentModificationException if appropriate synchronization is not used.
Thread.yield(); sessionPartitions.forEach(
} (key, value) -> Thread.yield()
);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -397,7 +397,7 @@ public class RecordAccumulatorTest {
List<ProducerBatch> batches = accum.drain(metadataCache, nodes, 5 * 1024, 0).get(node1.id()); List<ProducerBatch> batches = accum.drain(metadataCache, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) { if (batches != null) {
for (ProducerBatch batch : batches) { for (ProducerBatch batch : batches) {
for (Record record : batch.records().records()) for (@SuppressWarnings("UnusedLocalVariable") Record ignored : batch.records().records())
read++; read++;
accum.deallocate(batch); accum.deallocate(batch);
} }

View File

@ -1674,7 +1674,7 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0); appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1); Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
sender.runOnce(); // connect and send. sender.runOnce(); // connect and send.

View File

@ -527,11 +527,6 @@ public class RequestResponseTest {
public void fetchResponseVersionTest() { public void fetchResponseVersionTest() {
Uuid id = Uuid.randomUuid(); Uuid id = Uuid.randomUuid();
MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(0)
.setHighWatermark(1000000)
.setLogStartOffset(-1)
.setRecords(records);
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> idResponseData = new LinkedHashMap<>(); LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> idResponseData = new LinkedHashMap<>();
idResponseData.put(new TopicIdPartition(id, new TopicPartition("test", 0)), idResponseData.put(new TopicIdPartition(id, new TopicPartition("test", 0)),
new FetchResponseData.PartitionData() new FetchResponseData.PartitionData()

View File

@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
@ -283,8 +282,6 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class); KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class); MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
String sourceClusterName = "cluster1"; String sourceClusterName = "cluster1";

View File

@ -60,9 +60,7 @@ public class MonitorableSinkConnector extends TestableSinkConnector {
@Override @Override
public void put(Collection<SinkRecord> records) { public void put(Collection<SinkRecord> records) {
super.put(records); super.put(records);
for (SinkRecord ignore : records) { count += records.size();
count++;
}
} }
} }

View File

@ -752,9 +752,6 @@ public class AbstractWorkerSourceTaskTest {
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
workerTask.toSend = Arrays.asList(record1); workerTask.toSend = Arrays.asList(record1);
// The transformation errored out so the error should be ignored & the record skipped with error tolerance all // The transformation errored out so the error should be ignored & the record skipped with error tolerance all

View File

@ -621,6 +621,7 @@ public class WorkerConnectorTest {
String type = metrics.currentMetricValueAsString(metricGroup, "connector-type"); String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class"); String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
String version = metrics.currentMetricValueAsString(metricGroup, "connector-version"); String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
assertEquals("unassigned", status);
assertEquals(expectedType, type); assertEquals(expectedType, type);
assertNotNull(clazz); assertNotNull(clazz);
assertEquals(VERSION, version); assertEquals(VERSION, version);

View File

@ -960,6 +960,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
// back to the caller. It's important to note that the task being executed is already // back to the caller. It's important to note that the task being executed is already
// cancelled before the executing thread is interrupted. The caller is responsible // cancelled before the executing thread is interrupted. The caller is responsible
// for handling the exception gracefully by checking if the task is already cancelled. // for handling the exception gracefully by checking if the task is already cancelled.
@SuppressWarnings("UnusedLocalVariable")
boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs(); throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs();
} }

View File

@ -74,7 +74,7 @@ public class EvolutionVerifier {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field); verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
} }
for (StructSpec struct : topLevelMessage.commonStructs()) { for (StructSpec struct : topLevelMessage.commonStructs()) {
for (FieldSpec field : topLevelMessage.fields()) { for (FieldSpec field : struct.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field); verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
} }
} }

View File

@ -2967,7 +2967,7 @@ public class OffsetMetadataManagerTest {
assertEquals(1, group.generationId()); assertEquals(1, group.generationId());
group.transitionTo(ClassicGroupState.STABLE); group.transitionTo(ClassicGroupState.STABLE);
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset( context.commitOffset(
new OffsetCommitRequestData() new OffsetCommitRequestData()
.setGroupId("foo") .setGroupId("foo")
.setMemberId("member") .setMemberId("member")

View File

@ -21,10 +21,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -49,7 +45,6 @@ public class TopicMetadataTest {
@Test @Test
public void testEquals() { public void testEquals() {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata); assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);

View File

@ -198,7 +198,7 @@ public class QuorumControllerMetricsIntegrationTest {
build() build()
) { ) {
QuorumController active = controlEnv.activeController(); QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3); registerBrokersAndUnfence(active, 3);
// Test that a new operation increments operationsStarted. We retry this if needed // Test that a new operation increments operationsStarted. We retry this if needed
// to handle the case where another operation is performed in between loading // to handle the case where another operation is performed in between loading

View File

@ -1535,7 +1535,6 @@ public class ReplicationControlManagerTest {
anonymousContextFor(ApiKeys.CREATE_TOPICS); anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult = ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo")); replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
CreatableTopicResult createdTopic = createResult.response().topics().find("foo"); CreatableTopicResult createdTopic = createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode()); assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records()); ctx.replay(createResult.records());

View File

@ -189,24 +189,6 @@ public class KafkaRaftClientReconfigTest {
.withUnknownLeader(0) .withUnknownLeader(0)
.build(); .build();
List<List<ControlRecord>> expectedBootstrapRecords = Arrays.asList(
Arrays.asList(
new ControlRecord(
ControlRecordType.SNAPSHOT_HEADER,
new SnapshotHeaderRecord()
.setVersion((short) 0)
.setLastContainedLogTimestamp(0)
)
),
Arrays.asList(
new ControlRecord(
ControlRecordType.SNAPSHOT_FOOTER,
new SnapshotFooterRecord()
.setVersion((short) 0)
)
)
);
// check leader does not write bootstrap records to log // check leader does not write bootstrap records to log
context.unattachedToLeader(); context.unattachedToLeader();

View File

@ -653,9 +653,8 @@ class PersisterStateManagerTest {
CompletableFuture<ReadShareGroupStateResponse> resultFuture = handler.result(); CompletableFuture<ReadShareGroupStateResponse> resultFuture = handler.result();
ReadShareGroupStateResponse result = null;
try { try {
result = resultFuture.get(); resultFuture.get();
} catch (Exception e) { } catch (Exception e) {
fail("Failed to get result from future", e); fail("Failed to get result from future", e);
} }

View File

@ -1237,7 +1237,6 @@ class ShareCoordinatorShardTest {
assertEquals(expectedRecords, result.records()); assertEquals(expectedRecords, result.records());
// invalid state epoch // invalid state epoch
int stateEpoch = 1;
partition = 0; partition = 0;
shard.replay(0L, 0L, (short) 0, ShareCoordinatorRecordHelpers.newShareSnapshotRecord( shard.replay(0L, 0L, (short) 0, ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder() GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder()

View File

@ -169,6 +169,7 @@ public class LogLoader {
long offset = LogFileUtils.offsetFromFile(file); long offset = LogFileUtils.offsetFromFile(file);
if (offset >= minSwapFileOffset && offset < maxSwapFileOffset) { if (offset >= minSwapFileOffset && offset < maxSwapFileOffset) {
logger.info("Deleting segment files {} that is compacted but has not been deleted yet.", file.getName()); logger.info("Deleting segment files {} that is compacted but has not been deleted yet.", file.getName());
@SuppressWarnings("UnusedLocalVariable")
boolean ignore = file.delete(); boolean ignore = file.delete();
} }
} }
@ -186,6 +187,7 @@ public class LogLoader {
} }
if (file.getName().endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) { if (file.getName().endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) {
logger.info("Recovering file {} by renaming from {} files.", file.getName(), LogFileUtils.SWAP_FILE_SUFFIX); logger.info("Recovering file {} by renaming from {} files.", file.getName(), LogFileUtils.SWAP_FILE_SUFFIX);
@SuppressWarnings("UnusedLocalVariable")
boolean ignore = file.renameTo(new File(Utils.replaceSuffix(file.getPath(), LogFileUtils.SWAP_FILE_SUFFIX, ""))); boolean ignore = file.renameTo(new File(Utils.replaceSuffix(file.getPath(), LogFileUtils.SWAP_FILE_SUFFIX, "")));
} }
} }

View File

@ -144,7 +144,6 @@ public class LogSegmentTest {
}) })
public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long largestOffset) throws IOException { public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long largestOffset) throws IOException {
try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) { try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
long currentTime = Time.SYSTEM.milliseconds();
MemoryRecords memoryRecords = v1Records(0, "hello"); MemoryRecords memoryRecords = v1Records(0, "hello");
assertThrows(LogSegmentOffsetOverflowException.class, () -> seg.append(largestOffset, memoryRecords)); assertThrows(LogSegmentOffsetOverflowException.class, () -> seg.append(largestOffset, memoryRecords));
} }

View File

@ -26,7 +26,6 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
@ -117,7 +116,6 @@ public class KTableEfficientRangeQueryTest {
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, TABLE_NAME, enableLogging, enableCaching); final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, TABLE_NAME, enableLogging, enableCaching);
//Create topology: table from input topic //Create topology: table from input topic
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table =
builder.table("input", stateStoreConfig); builder.table("input", stateStoreConfig);
final Topology topology = builder.build(); final Topology topology = builder.build();

View File

@ -212,7 +212,7 @@ public class RegexSourceIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match")); builder.stream(Pattern.compile("not-a-match"));
pattern1Stream pattern1Stream
.selectKey((k, v) -> k) .selectKey((k, v) -> k)

View File

@ -885,7 +885,10 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasksLock.lock(); restoredActiveTasksLock.lock();
try { try {
while (restoredActiveTasks.isEmpty() && now <= deadline) { while (restoredActiveTasks.isEmpty() && now <= deadline) {
final boolean elapsed = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS); // We can ignore whether the deadline expired during await, as the while loop condition will
// check again for deadline expiration.
@SuppressWarnings("UnusedLocalVariable")
final boolean ignored = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS);
now = time.milliseconds(); now = time.milliseconds();
} }
result.addAll(restoredActiveTasks); result.addAll(restoredActiveTasks);

View File

@ -472,7 +472,6 @@ public class InternalStreamsBuilderTest {
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>()); final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
assertNotNull(join); assertNotNull(join);
assertTrue(((StreamStreamJoinNode) join).getSelfJoin()); assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
final GraphNode parent = join.parentNodes().stream().findFirst().get();
final AtomicInteger count = new AtomicInteger(); final AtomicInteger count = new AtomicInteger();
countJoinWindowNodes(count, builder.root, new HashSet<>()); countJoinWindowNodes(count, builder.root, new HashSet<>());
assertEquals(count.get(), 1); assertEquals(count.get(), 1);

View File

@ -34,7 +34,6 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Named;
@ -816,7 +815,7 @@ public class SuppressScenarioTest {
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(15))) .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(15)))
.aggregate(() -> "", Named.as("test"), Materialized.as("store")) .aggregate(() -> "", Named.as("test"), Materialized.as("store"))
.suppress(Suppressed.untilWindowCloses(unbounded())) .suppress(Suppressed.untilWindowCloses(unbounded()))
@ -829,7 +828,7 @@ public class SuppressScenarioTest {
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15))) .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15)))
.aggregate(() -> "", Named.as("test"), Materialized.as("store")) .aggregate(() -> "", Named.as("test"), Materialized.as("store"))
.suppress(Suppressed.untilWindowCloses(unbounded())) .suppress(Suppressed.untilWindowCloses(unbounded()))
@ -842,7 +841,7 @@ public class SuppressScenarioTest {
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), Duration.ofMinutes(5))) .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), Duration.ofMinutes(5)))
.aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), Materialized.as("store")) .aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), Materialized.as("store"))
.suppress(Suppressed.untilWindowCloses(unbounded())) .suppress(Suppressed.untilWindowCloses(unbounded()))

View File

@ -44,7 +44,6 @@ public class ProcessorRecordContextTest {
@Test @Test
public void shouldEstimateNullTopicAndEmptyHeadersAsZeroLength() { public void shouldEstimateNullTopicAndEmptyHeadersAsZeroLength() {
final Headers headers = new RecordHeaders();
final ProcessorRecordContext context = new ProcessorRecordContext( final ProcessorRecordContext context = new ProcessorRecordContext(
42L, 42L,
73L, 73L,

View File

@ -1370,8 +1370,6 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class); final TaskManager taskManager = mock(TaskManager.class);
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
@ -1471,8 +1469,6 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class); final TaskManager taskManager = mock(TaskManager.class);
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
@ -1491,8 +1487,6 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class); final TaskManager taskManager = mock(TaskManager.class);
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
@ -2590,8 +2584,6 @@ public class StreamThreadTest {
doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll(); doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
@ -2620,8 +2612,6 @@ public class StreamThreadTest {
doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions); doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
@ -3113,8 +3103,6 @@ public class StreamThreadTest {
when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics); when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);

View File

@ -1825,9 +1825,6 @@ public class TaskManagerTest {
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING) .inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build(); .withInputPartitions(taskId02Partitions).build();
final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));

View File

@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
@ -968,7 +967,6 @@ public class LegacyStickyTaskAssignorTest {
final Cluster cluster = getRandomCluster(nodeSize, topicSize, partitionSize); final Cluster cluster = getRandomCluster(nodeSize, topicSize, partitionSize);
final Map<TaskId, Set<TopicPartition>> partitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, false); final Map<TaskId, Set<TopicPartition>> partitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, false);
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, true); final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, true);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
final Map<ProcessId, Map<String, Optional<String>>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize); final Map<ProcessId, Map<String, Optional<String>>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize);
final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize); final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize);

View File

@ -135,8 +135,8 @@ public class LogicalKeyValueSegmentsTest {
@Test @Test
public void shouldCleanupSegmentsThatHaveExpired() { public void shouldCleanupSegmentsThatHaveExpired() {
final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0); segments.getOrCreateSegmentIfLive(0, context, 0);
final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(2, context, SEGMENT_INTERVAL * 2L); segments.getOrCreateSegmentIfLive(2, context, SEGMENT_INTERVAL * 2L);
final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L); final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L); final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
@ -151,7 +151,7 @@ public class LogicalKeyValueSegmentsTest {
@Test @Test
public void shouldNotCleanUpReservedSegments() { public void shouldNotCleanUpReservedSegments() {
final LogicalKeyValueSegment reservedSegment = segments.createReservedSegment(-1, "reserved"); final LogicalKeyValueSegment reservedSegment = segments.createReservedSegment(-1, "reserved");
final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL); segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(2, context, SEGMENT_INTERVAL * 2L); final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(2, context, SEGMENT_INTERVAL * 2L);
segments.cleanupExpiredSegments(SEGMENT_INTERVAL * 6L); segments.cleanupExpiredSegments(SEGMENT_INTERVAL * 6L);

View File

@ -672,8 +672,6 @@ public class RelationalSmokeTest extends SmokeTestUtil {
} }
public static boolean verifySync(final String broker, final Instant deadline) throws InterruptedException { public static boolean verifySync(final String broker, final Instant deadline) throws InterruptedException {
final Deserializer<Integer> keyDeserializer = intSerde.deserializer();
final Deserializer<Article> articleDeserializer = new Article.ArticleDeserializer(); final Deserializer<Article> articleDeserializer = new Article.ArticleDeserializer();
final Deserializer<AugmentedArticle> augmentedArticleDeserializer = final Deserializer<AugmentedArticle> augmentedArticleDeserializer =

View File

@ -77,7 +77,6 @@ public class MockApiFixedKeyProcessor<KIn, VIn, VOut> implements FixedKeyProcess
public void process(final FixedKeyRecord<KIn, VIn> record) { public void process(final FixedKeyRecord<KIn, VIn> record) {
final KIn key = record.key(); final KIn key = record.key();
final VIn value = record.value(); final VIn value = record.value();
final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
if (value != null) { if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp())); lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp()));

View File

@ -77,7 +77,6 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
public void process(final Record<KIn, VIn> record) { public void process(final Record<KIn, VIn> record) {
final KIn key = record.key(); final KIn key = record.key();
final VIn value = record.value(); final VIn value = record.value();
final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
if (value != null) { if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp())); lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp()));

View File

@ -79,7 +79,8 @@ public class ShareConsumerPerformance {
shareConsumers.forEach(shareConsumer -> shareConsumersMetrics.add(shareConsumer.metrics())); shareConsumers.forEach(shareConsumer -> shareConsumersMetrics.add(shareConsumer.metrics()));
} }
shareConsumers.forEach(shareConsumer -> { shareConsumers.forEach(shareConsumer -> {
Map<TopicIdPartition, Optional<KafkaException>> val = shareConsumer.commitSync(); @SuppressWarnings("UnusedLocalVariable")
Map<TopicIdPartition, Optional<KafkaException>> ignored = shareConsumer.commitSync();
shareConsumer.close(Duration.ofMillis(500)); shareConsumer.close(Duration.ofMillis(500));
}); });

View File

@ -182,10 +182,8 @@ public class ConfigurableProducerWorker implements TaskWorker {
long startTimeMs = Time.SYSTEM.milliseconds(); long startTimeMs = Time.SYSTEM.milliseconds();
try { try {
try { try {
long sentMessages = 0;
while (true) { while (true) {
sendMessage(); sendMessage();
sentMessages++;
} }
} catch (Exception e) { } catch (Exception e) {
throw e; throw e;