From 97c8c6b595bbff31b06e457aa08131965c845040 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Thu, 25 Sep 2025 21:07:02 +0530 Subject: [PATCH] KAFKA-19733 Fix arguments to assertEquals() in clients module (#20586) The given PR mostly fixes the order of arguments in `assertEquals()` for the Clients module. Some minor cleanups were included with the same too. Reviewers: Chia-Ping Tsai --- .../producer/ProducerIdExpirationTest.java | 12 +-- .../ProducerSendWhileDeletionTest.java | 5 +- .../clients/ClusterConnectionStatesTest.java | 4 +- .../apache/kafka/clients/MetadataTest.java | 74 +++++++-------- .../kafka/clients/admin/ConfigTest.java | 2 +- .../CooperativeStickyAssignorTest.java | 4 +- .../clients/consumer/MockConsumerTest.java | 4 +- .../internals/AbstractStickyAssignorTest.java | 4 +- .../internals/ConsumerCoordinatorTest.java | 46 ++++----- .../internals/FetchRequestManagerTest.java | 16 ++-- .../consumer/internals/FetcherTest.java | 20 ++-- .../internals/KafkaConsumerMetricsTest.java | 4 +- .../consumer/internals/OffsetFetcherTest.java | 6 +- .../OffsetForLeaderEpochClientTest.java | 6 +- ...reamsGroupHeartbeatRequestManagerTest.java | 95 +++++-------------- .../internals/StreamsRebalanceDataTest.java | 2 +- .../metrics/AsyncConsumerMetricsTest.java | 16 ++-- .../clients/producer/MockProducerTest.java | 8 +- .../producer/internals/BufferPoolTest.java | 4 +- .../internals/KafkaProducerMetricsTest.java | 4 +- .../internals/RecordAccumulatorTest.java | 4 +- .../producer/internals/SenderTest.java | 6 +- .../internals/TransactionManagerTest.java | 26 ++--- .../feature/SupportedVersionRangeTest.java | 2 +- .../kafka/common/metrics/MetricsTest.java | 4 +- .../kafka/common/metrics/SensorTest.java | 12 +-- .../requests/UpdateFeaturesRequestTest.java | 12 +-- .../SaslServerAuthenticatorTest.java | 2 +- .../security/kerberos/KerberosRuleTest.java | 12 +-- ...arerUnsecuredLoginCallbackHandlerTest.java | 2 +- .../ClientTelemetryReporterTest.java | 2 +- .../server/policy/AlterConfigPolicyTest.java | 4 +- .../org/apache/kafka/test/TestSslUtils.java | 42 ++++---- 33 files changed, 215 insertions(+), 251 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java index f79b3786253..a9489c88327 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java @@ -204,10 +204,10 @@ public class ProducerIdExpirationTest { // Update the producer ID expiration ms to a very high value. admin.incrementalAlterConfigs(producerIdExpirationConfig("100000")); - cluster.brokers().values().forEach(broker -> { + cluster.brokers().values().forEach(broker -> TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000, - () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); - }); + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100) + ); // Send more records to send producer ID back to brokers. producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); producer.flush(); @@ -226,10 +226,10 @@ public class ProducerIdExpirationTest { kafkaBroker.awaitShutdown(); kafkaBroker.startup(); cluster.waitForReadyBrokers(); - cluster.brokers().values().forEach(broker -> { + cluster.brokers().values().forEach(broker -> TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100, - () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); - }); + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100) + ); // Ensure producer ID expires quickly again. waitProducerIdExpire(admin); diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java index 4301eecbb9c..aa93431cf63 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java @@ -184,9 +184,8 @@ public class ProducerSendWhileDeletionTest { try (var producer = createProducer()) { for (int i = 1; i <= numRecords; i++) { producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()), - (metadata, exception) -> { - numAcks.incrementAndGet(); - }); + (metadata, exception) -> numAcks.incrementAndGet() + ); } producer.flush(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index f647d95445f..9812f490ddd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -186,7 +186,7 @@ public class ClusterConnectionStatesTest { connectionStates.authenticationFailed(nodeId1, time.milliseconds(), new AuthenticationException("No path to CA for certificate!")); time.sleep(1000); - assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.AUTHENTICATION_FAILED); + assertEquals(ConnectionState.AUTHENTICATION_FAILED, connectionStates.connectionState(nodeId1)); assertNotNull(connectionStates.authenticationException(nodeId1)); assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); @@ -210,7 +210,7 @@ public class ClusterConnectionStatesTest { connectionStates.remove(nodeId1); assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); - assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), 0L); + assertEquals(0L, connectionStates.connectionDelay(nodeId1, time.milliseconds())); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 13c378d3983..9ac75191004 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -361,28 +361,28 @@ public class MetadataTest { // Metadata with newer epoch is handled metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue())); // Don't update to an older one assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1)); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue())); // Don't cause update if it's the same one assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10)); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue())); // Update if we see newer epoch assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12)); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue())); metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue())); // Don't overwrite metadata with older epoch metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L); - assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue())); } @Test @@ -465,7 +465,7 @@ public class MetadataTest { metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L); assertNotNull(metadata.fetch().partition(tp)); assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent()); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } // Fake an empty ISR, but with an older epoch, should reject it @@ -475,8 +475,8 @@ public class MetadataTest { new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap()); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L); - assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(1, metadata.fetch().partition(tp).inSyncReplicas().length); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } // Fake an empty ISR, with same epoch, accept it @@ -486,8 +486,8 @@ public class MetadataTest { new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap()); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L); - assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(0, metadata.fetch().partition(tp).inSyncReplicas().length); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } // Empty metadata response, should not keep old partition but should keep the last-seen epoch @@ -495,7 +495,7 @@ public class MetadataTest { MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.emptyMap()); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L); assertNull(metadata.fetch().partition(tp)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } // Back in the metadata, with old epoch, should not get added @@ -503,7 +503,7 @@ public class MetadataTest { MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L); assertNull(metadata.fetch().partition(tp)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } } @@ -522,31 +522,31 @@ public class MetadataTest { metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L); assertNotNull(metadata.fetch().partition(tp)); assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent()); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); + assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue()); // Simulate a leader epoch from another response, like a fetch response or list offsets assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101)); // Cache of partition stays, but current partition info is not available since it's stale assertNotNull(metadata.fetch().partition(tp)); - assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5); + assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue()); assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent()); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101); + assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue()); // Metadata with older epoch is rejected, metadata state is unchanged metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L); assertNotNull(metadata.fetch().partition(tp)); - assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5); + assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue()); assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent()); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101); + assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue()); // Metadata with equal or newer epoch is accepted metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 101); metadata.updateWithCurrentRequestVersion(metadataResponse, false, 30L); assertNotNull(metadata.fetch().partition(tp)); - assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5); + assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue()); assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent()); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101); + assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue()); } @Test @@ -585,18 +585,18 @@ public class MetadataTest { metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); Cluster cluster = metadata.fetch(); - assertEquals(cluster.clusterResource().clusterId(), "dummy"); - assertEquals(cluster.nodes().size(), 4); + assertEquals("dummy", cluster.clusterResource().clusterId()); + assertEquals(4, cluster.nodes().size()); // topic counts assertEquals(cluster.invalidTopics(), Collections.singleton("topic3")); assertEquals(cluster.unauthorizedTopics(), Collections.singleton("topic4")); - assertEquals(cluster.topics().size(), 3); + assertEquals(3, cluster.topics().size()); assertEquals(cluster.internalTopics(), Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME)); // partition counts - assertEquals(cluster.partitionsForTopic("topic1").size(), 2); - assertEquals(cluster.partitionsForTopic("topic2").size(), 3); + assertEquals(2, cluster.partitionsForTopic("topic1").size()); + assertEquals(3, cluster.partitionsForTopic("topic2").size()); // Sentinel instances InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); @@ -798,10 +798,10 @@ public class MetadataTest { TopicPartition tp = new TopicPartition("topic-1", 0); - assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(node.id(), 0)); + assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(0, node.id())); assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent()); - assertEquals(metadata.fetch().nodeById(0).id(), 0); - assertEquals(metadata.fetch().nodeById(1).id(), 1); + assertEquals(0, metadata.fetch().nodeById(0).id()); + assertEquals(1, metadata.fetch().nodeById(1).id()); } @Test @@ -831,7 +831,7 @@ public class MetadataTest { TopicPartition tp = new TopicPartition("topic-1", 0); - assertEquals(metadata.fetch().nodeById(0).id(), 0); + assertEquals(0, metadata.fetch().nodeById(0).id()); assertNull(metadata.fetch().partition(tp)); assertEquals(metadata.fetch().nodeIfOnline(tp, 0), Optional.empty()); } @@ -955,13 +955,13 @@ public class MetadataTest { // Update the metadata to add a new topic variant, "new", which will be retained with "keep". Note this // means that all of the "old" topics should be dropped. Cluster cluster = metadata.fetch(); - assertEquals(cluster.clusterResource().clusterId(), oldClusterId); - assertEquals(cluster.nodes().size(), oldNodes); + assertEquals(oldClusterId, cluster.clusterResource().clusterId()); + assertEquals(oldNodes, cluster.nodes().size()); assertEquals(cluster.invalidTopics(), Set.of("oldInvalidTopic", "keepInvalidTopic")); assertEquals(cluster.unauthorizedTopics(), Set.of("oldUnauthorizedTopic", "keepUnauthorizedTopic")); assertEquals(cluster.topics(), Set.of("oldValidTopic", "keepValidTopic")); - assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2); - assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3); + assertEquals(2, cluster.partitionsForTopic("oldValidTopic").size()); + assertEquals(3, cluster.partitionsForTopic("keepValidTopic").size()); assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values())); String newClusterId = "newClusterId"; @@ -990,13 +990,13 @@ public class MetadataTest { assertNull(metadataTopicIds2.get("oldValidTopic")); cluster = metadata.fetch(); - assertEquals(cluster.clusterResource().clusterId(), newClusterId); + assertEquals(newClusterId, cluster.clusterResource().clusterId()); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Set.of("keepInvalidTopic", "newInvalidTopic")); assertEquals(cluster.unauthorizedTopics(), Set.of("keepUnauthorizedTopic", "newUnauthorizedTopic")); assertEquals(cluster.topics(), Set.of("keepValidTopic", "newValidTopic")); - assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); - assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertEquals(2, cluster.partitionsForTopic("keepValidTopic").size()); + assertEquals(4, cluster.partitionsForTopic("newValidTopic").size()); assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. @@ -1008,7 +1008,7 @@ public class MetadataTest { topicIds.forEach((topicName, topicId) -> assertNull(metadataTopicIds3.get(topicName))); cluster = metadata.fetch(); - assertEquals(cluster.clusterResource().clusterId(), newClusterId); + assertEquals(newClusterId, cluster.clusterResource().clusterId()); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java index d09cca7ad66..f3b1e73d72e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java @@ -64,7 +64,7 @@ public class ConfigTest { assertEquals(config, config); assertEquals(config, new Config(config.entries())); assertNotEquals(new Config(Collections.singletonList(E1)), config); - assertNotEquals(config, "this"); + assertNotEquals("this", config); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index b85d000e167..6a6aa919be1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -81,7 +81,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; assertTrue(encodedGeneration.isPresent()); - assertEquals(encodedGeneration.get(), DEFAULT_GENERATION); + assertEquals(DEFAULT_GENERATION, encodedGeneration.get()); int generation = 10; assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); @@ -90,7 +90,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; assertTrue(encodedGeneration.isPresent()); - assertEquals(encodedGeneration.get(), generation); + assertEquals(generation, encodedGeneration.get()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 647976b1d1d..6968b45a57b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -209,9 +209,7 @@ public class MockConsumerTest { consumer.assign(Collections.singleton(partition)); consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); - IntStream.range(0, 10).forEach(offset -> { - consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null, null)); - }); + IntStream.range(0, 10).forEach(offset -> consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null, null))); consumer.setMaxPollRecords(2L); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index fe6b4d100ff..4e9525264a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -1025,7 +1025,7 @@ public abstract class AbstractStickyAssignorTest { Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); - assertEquals(assignment.values().stream().mapToInt(List::size).sum(), 1 + 100); + assertEquals(1 + 100, assignment.values().stream().mapToInt(List::size).sum()); assertEquals(Collections.singleton(consumerId), assignment.keySet()); assertTrue(isFullyBalanced(assignment)); } @@ -1043,7 +1043,7 @@ public abstract class AbstractStickyAssignorTest { assignment = assignor.assign(Collections.emptyMap(), subscriptions); assertTrue(assignor.partitionsTransferringOwnership.isEmpty()); - assertEquals(assignment.size(), 1); + assertEquals(1, assignment.size()); assertTrue(assignment.get(consumerId).isEmpty()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index fcbbfeb8eeb..623fd765f39 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -568,13 +568,13 @@ public abstract class ConsumerCoordinatorTest { assertFalse(client.hasInFlightRequests()); // should try to find coordinator since we are commit async - coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> { - fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception); - }); + coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> + fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception) + ); coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); assertTrue(client.hasInFlightRequests()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); client.respond(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(0)); @@ -582,7 +582,7 @@ public abstract class ConsumerCoordinatorTest { // after we've discovered the coordinator we should send // out the commit request immediately assertTrue(client.hasInFlightRequests()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 1); + assertEquals(1, coordinator.inFlightAsyncCommits.get()); } @Test @@ -619,13 +619,13 @@ public abstract class ConsumerCoordinatorTest { assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail"); assertFalse(committed.get()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 1); + assertEquals(1, coordinator.inFlightAsyncCommits.get()); prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE); assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed"); assertTrue(committed.get(), "expected commit callback to be invoked"); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); } @Test @@ -646,13 +646,13 @@ public abstract class ConsumerCoordinatorTest { "Unexpected exception cause type: " + (cause == null ? null : cause.getClass())); }); } - assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests); + assertEquals(numRequests, coordinator.inFlightAsyncCommits.get()); coordinator.markCoordinatorUnknown("test cause"); consumerClient.pollNoWakeup(); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(numRequests, responses.get()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); } @Test @@ -697,7 +697,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.markCoordinatorUnknown("test cause"); consumerClient.pollNoWakeup(); assertTrue(asyncCallbackInvoked.get()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); } @Test @@ -2350,7 +2350,7 @@ public abstract class ConsumerCoordinatorTest { MockCommitCallback secondCommitCallback = new MockCommitCallback(); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback); - assertEquals(coordinator.inFlightAsyncCommits.get(), 2); + assertEquals(2, coordinator.inFlightAsyncCommits.get()); respondToOffsetCommitRequest(singletonMap(t1p, 100L), error); consumerClient.pollNoWakeup(); @@ -2360,7 +2360,7 @@ public abstract class ConsumerCoordinatorTest { assertTrue(coordinator.coordinatorUnknown()); assertInstanceOf(RetriableCommitFailedException.class, firstCommitCallback.exception); assertInstanceOf(RetriableCommitFailedException.class, secondCommitCallback.exception); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); } @Test @@ -2549,7 +2549,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); assertNull(mockOffsetCommitCallback.exception); @@ -2580,7 +2580,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); } @Test @@ -2590,7 +2590,7 @@ public abstract class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); assertInstanceOf(RetriableCommitFailedException.class, mockOffsetCommitCallback.exception); @@ -2605,7 +2605,7 @@ public abstract class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2622,7 +2622,7 @@ public abstract class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2639,7 +2639,7 @@ public abstract class ConsumerCoordinatorTest { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L)); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2703,7 +2703,7 @@ public abstract class ConsumerCoordinatorTest { } }; - assertEquals(coordinator.inFlightAsyncCommits.get(), 1); + assertEquals(1, coordinator.inFlightAsyncCommits.get()); thread.start(); client.waitForRequests(2, 5000); @@ -2711,7 +2711,7 @@ public abstract class ConsumerCoordinatorTest { respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE); thread.join(); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets); } @@ -3100,7 +3100,7 @@ public abstract class ConsumerCoordinatorTest { assertEquals(Collections.emptySet(), subscriptions.initializingPartitions()); assertFalse(subscriptions.hasAllFetchPositions()); assertTrue(subscriptions.awaitingValidation(t1p)); - assertEquals(subscriptions.position(t1p).offset, 100L); + assertEquals(100L, subscriptions.position(t1p).offset); assertNull(subscriptions.validPosition(t1p)); } @@ -3470,7 +3470,7 @@ public abstract class ConsumerCoordinatorTest { assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException); assertThrows(FencedInstanceIdException.class, () -> coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback())); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); assertThrows(FencedInstanceIdException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE))); } @@ -3739,7 +3739,7 @@ public abstract class ConsumerCoordinatorTest { prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()); - assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + assertEquals(0, coordinator.inFlightAsyncCommits.get()); coordinator.invokeCompletedOffsetCommitCallbacks(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index f806ab65b6b..1378e4b53a1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -1815,7 +1815,7 @@ public class FetchRequestManagerTest { assertEquals(1, oorExceptions.size()); OffsetOutOfRangeException oor = oorExceptions.get(0); assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0)); - assertEquals(oor.offsetOutOfRangePartitions().size(), 1); + assertEquals(1, oor.offsetOutOfRangePartitions().size()); fetchRecordsInto(fetchedRecords); @@ -2359,7 +2359,7 @@ public class FetchRequestManagerTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); } @Test @@ -2477,7 +2477,7 @@ public class FetchRequestManagerTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); List> fetchedConsumerRecords = fetchedRecords.get(tp0); Set expectedCommittedKeys = Set.of("commit1-1", "commit1-2"); Set actuallyCommittedKeys = new HashSet<>(); @@ -2854,7 +2854,7 @@ public class FetchRequestManagerTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); } private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) { @@ -2939,8 +2939,8 @@ public class FetchRequestManagerTest { Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); - assertEquals(subscriptions.position(tp0).offset, 3L); - assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1)); + assertEquals(3L, subscriptions.position(tp0).offset); + assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue())); } @Test @@ -3110,7 +3110,7 @@ public class FetchRequestManagerTest { fetchRecords(); Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); - assertEquals(selected.id(), 1); + assertEquals(1, selected.id()); assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); @@ -3124,7 +3124,7 @@ public class FetchRequestManagerTest { fetchRecords(); selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); - assertEquals(selected.id(), -1); + assertEquals(-1, selected.id()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index b85daebb8d8..ee051a42ca8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1802,7 +1802,7 @@ public class FetcherTest { assertEquals(1, oorExceptions.size()); OffsetOutOfRangeException oor = oorExceptions.get(0); assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0)); - assertEquals(oor.offsetOutOfRangePartitions().size(), 1); + assertEquals(1, oor.offsetOutOfRangePartitions().size()); fetchRecordsInto(fetchedRecords); @@ -2346,7 +2346,7 @@ public class FetcherTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); } @Test @@ -2464,7 +2464,7 @@ public class FetcherTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); List> fetchedConsumerRecords = fetchedRecords.get(tp0); Set expectedCommittedKeys = Set.of("commit1-1", "commit1-2"); Set actuallyCommittedKeys = new HashSet<>(); @@ -3054,7 +3054,7 @@ public class FetcherTest { Map>> fetchedRecords = fetchRecords(); assertTrue(fetchedRecords.containsKey(tp0)); - assertEquals(fetchedRecords.get(tp0).size(), 2); + assertEquals(2, fetchedRecords.get(tp0).size()); } private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) { @@ -3139,8 +3139,8 @@ public class FetcherTest { Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); - assertEquals(subscriptions.position(tp0).offset, 3L); - assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1)); + assertEquals(3L, subscriptions.position(tp0).offset); + assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue())); } @Test @@ -3217,8 +3217,8 @@ public class FetcherTest { Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); - assertEquals(subscriptions.position(tp0).offset, 3L); - assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1)); + assertEquals(3L, subscriptions.position(tp0).offset); + assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue())); } @Test @@ -3388,7 +3388,7 @@ public class FetcherTest { fetchRecords(); Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); - assertEquals(selected.id(), 1); + assertEquals(1, selected.id()); assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); @@ -3402,7 +3402,7 @@ public class FetcherTest { fetchRecords(); selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); - assertEquals(selected.id(), -1); + assertEquals(-1, selected.id()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java index 22aa33098ee..7fa9f7e31f1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java @@ -99,8 +99,8 @@ class KafkaConsumerMetricsTest { private void assertMetricValue(final String name) { assertEquals( - metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue(), - (double) METRIC_VALUE + (double) METRIC_VALUE, + metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue() ); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index 3b7fe70ea4c..182900c0207 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -246,7 +246,7 @@ public class OffsetFetcherTest { assertTrue(subscriptions.hasValidPosition(tp0)); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(subscriptions.position(tp0).offset, 5L); + assertEquals(5L, subscriptions.position(tp0).offset); } @Test @@ -395,7 +395,7 @@ public class OffsetFetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(metadata.updateRequested()); - assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long) epoch, 2)); + assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals(2, (long) epoch)); } @Test @@ -902,7 +902,7 @@ public class OffsetFetcherTest { ListOffsetsRequest offsetRequest = (ListOffsetsRequest) body; int epoch = offsetRequest.topics().get(0).partitions().get(0).currentLeaderEpoch(); assertTrue(epoch != ListOffsetsResponse.UNKNOWN_EPOCH, "Expected Fetcher to set leader epoch in request"); - assertEquals(epoch, 99, "Expected leader epoch to match epoch from metadata update"); + assertEquals(99, epoch, "Expected leader epoch to match epoch from metadata update"); return true; } else { fail("Should have seen ListOffsetRequest"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java index a48b32b43ef..8a3617d61c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java @@ -102,8 +102,8 @@ public class OffsetForLeaderEpochClientTest { assertTrue(result.partitionsToRetry().isEmpty()); assertTrue(result.endOffsets().containsKey(tp0)); assertEquals(result.endOffsets().get(tp0).errorCode(), Errors.NONE.code()); - assertEquals(result.endOffsets().get(tp0).leaderEpoch(), 1); - assertEquals(result.endOffsets().get(tp0).endOffset(), 10L); + assertEquals(1, result.endOffsets().get(tp0).leaderEpoch()); + assertEquals(10L, result.endOffsets().get(tp0).endOffset()); } @Test @@ -121,7 +121,7 @@ public class OffsetForLeaderEpochClientTest { consumerClient.pollNoWakeup(); assertTrue(future.failed()); - assertEquals(future.exception().getClass(), TopicAuthorizationException.class); + assertEquals(TopicAuthorizationException.class, future.exception().getClass()); assertTrue(((TopicAuthorizationException) future.exception()).unauthorizedTopics().contains(tp0.topic())); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 15e23875562..f4a2726b9e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -439,14 +439,10 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs); - }); + (mock, context) -> when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs)); final MockedConstruction pollTimerMockedConstruction = mockConstruction( Timer.class, - (mock, context) -> { - when(mock.isExpired()).thenReturn(true); - }); + (mock, context) -> when(mock.isExpired()).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -473,14 +469,10 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs); - }); + (mock, context) -> when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs)); final MockedConstruction pollTimerMockedConstruction = mockConstruction( Timer.class, - (mock, context) -> { - when(mock.isExpired()).thenReturn(true); - }); + (mock, context) -> when(mock.isExpired()).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -508,14 +500,10 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction pollTimerMockedConstruction = mockConstruction( Timer.class, - (mock, context) -> { - when(mock.isExpired()).thenReturn(true); - }) + (mock, context) -> when(mock.isExpired()).thenReturn(true)) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); @@ -551,9 +539,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }) + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); @@ -1001,9 +987,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1032,9 +1016,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1073,9 +1055,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1111,9 +1091,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1145,9 +1123,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1173,9 +1149,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class); final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class) @@ -1212,9 +1186,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class); final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class) @@ -1261,9 +1233,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class); final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class) @@ -1312,9 +1282,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class) ) { @@ -1343,9 +1311,7 @@ class StreamsGroupHeartbeatRequestManagerTest { try ( final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }); + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)); final MockedConstruction heartbeatStateMockedConstruction = mockConstruction( StreamsGroupHeartbeatRequestManager.HeartbeatState.class); final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class) @@ -1424,14 +1390,11 @@ class StreamsGroupHeartbeatRequestManagerTest { @Test public void testMaximumTimeToWaitPollTimerExpired() { try ( - final MockedConstruction timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> { - when(mock.isExpired()).thenReturn(true); - }); + final MockedConstruction timerMockedConstruction = + mockConstruction(Timer.class, (mock, context) -> when(mock.isExpired()).thenReturn(true)); final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.requestInFlight()).thenReturn(false); - }) + (mock, context) -> when(mock.requestInFlight()).thenReturn(false)) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final Timer pollTimer = timerMockedConstruction.constructed().get(0); @@ -1450,9 +1413,7 @@ class StreamsGroupHeartbeatRequestManagerTest { final MockedConstruction timerMockedConstruction = mockConstruction(Timer.class); final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.requestInFlight()).thenReturn(false); - }) + (mock, context) -> when(mock.requestInFlight()).thenReturn(false)) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final Timer pollTimer = timerMockedConstruction.constructed().get(0); @@ -1473,9 +1434,8 @@ class StreamsGroupHeartbeatRequestManagerTest { final long remainingMs = 12L; final long timeToNextHeartbeatMs = 6L; try ( - final MockedConstruction timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> { - when(mock.remainingMs()).thenReturn(remainingMs); - }); + final MockedConstruction timerMockedConstruction = + mockConstruction(Timer.class, (mock, context) -> when(mock.remainingMs()).thenReturn(remainingMs)); final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, (mock, context) -> { @@ -1500,14 +1460,11 @@ class StreamsGroupHeartbeatRequestManagerTest { public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs, final long timeToNextHeartbeatMs) { try ( - final MockedConstruction timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> { - when(mock.remainingMs()).thenReturn(remainingMs); - }); + final MockedConstruction timerMockedConstruction = + mockConstruction(Timer.class, (mock, context) -> when(mock.remainingMs()).thenReturn(remainingMs)); final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, - (mock, context) -> { - when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs); - }) + (mock, context) -> when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs)) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final Timer pollTimer = timerMockedConstruction.constructed().get(0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index 9607a0e9e20..606ba0b7350 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -62,7 +62,7 @@ public class StreamsRebalanceDataTest { public void testTaskIdCompareTo() { final StreamsRebalanceData.TaskId task = new StreamsRebalanceData.TaskId("subtopologyId1", 1); - assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())) == 0); + assertEquals(0, task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId()))); assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId())) < 0); assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1)) < 0); assertTrue(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId()).compareTo(task) > 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java index d6dd8c30caa..876bc3ffa12 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java @@ -113,13 +113,13 @@ public class AsyncConsumerMetricsTest { // Then: assertEquals( + (double) 10, metrics.metric( metrics.metricName( "application-event-queue-size", groupName ) - ).metricValue(), - (double) 10 + ).metricValue() ); } @@ -156,13 +156,13 @@ public class AsyncConsumerMetricsTest { // Then: assertEquals( + (double) 10, metrics.metric( metrics.metricName( "unsent-requests-queue-size", groupName ) - ).metricValue(), - (double) 10 + ).metricValue() ); } @@ -187,13 +187,13 @@ public class AsyncConsumerMetricsTest { // Then: assertEquals( + (double) 10, metrics.metric( metrics.metricName( "background-event-queue-size", groupName ) - ).metricValue(), - (double) 10 + ).metricValue() ); } @@ -223,13 +223,13 @@ public class AsyncConsumerMetricsTest { private void assertMetricValue(final String name, final String groupName) { assertEquals( + (double) METRIC_VALUE, metrics.metric( metrics.metricName( name, groupName ) - ).metricValue(), - (double) METRIC_VALUE + ).metricValue() ); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 495578bad92..e66dcca5044 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -724,10 +724,10 @@ public class MockProducerTest { buildMockProducer(false); Future metadata = producer.send(record2, (md, exception) -> { assertNotNull(md); - assertEquals(md.offset(), -1L, "Invalid offset"); - assertEquals(md.timestamp(), RecordBatch.NO_TIMESTAMP, "Invalid timestamp"); - assertEquals(md.serializedKeySize(), -1L, "Invalid Serialized Key size"); - assertEquals(md.serializedValueSize(), -1L, "Invalid Serialized value size"); + assertEquals(-1L, md.offset(), "Invalid offset"); + assertEquals(RecordBatch.NO_TIMESTAMP, md.timestamp(), "Invalid timestamp"); + assertEquals(-1L, md.serializedKeySize(), "Invalid Serialized Key size"); + assertEquals(-1L, md.serializedValueSize(), "Invalid Serialized value size"); }); IllegalArgumentException e = new IllegalArgumentException("dummy exception"); assertTrue(producer.errorNext(e), "Complete the second request with an error"); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 128e15ed6c6..727368e8edd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -219,7 +219,7 @@ public class BufferPoolTest { t1.join(); t2.join(); // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty - assertEquals(pool.queued(), 0); + assertEquals(0, pool.queued()); } @Test @@ -332,7 +332,7 @@ public class BufferPoolTest { } - assertEquals(bufferPool.availableMemory(), 1024); + assertEquals(1024, bufferPool.availableMemory()); } public static class StressTestThread extends Thread { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java index 46d1ed329ee..383aa82ee2d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java @@ -121,8 +121,8 @@ class KafkaProducerMetricsTest { private void assertMetricValue(final String name) { assertEquals( - metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue(), - (double) METRIC_VALUE + (double) METRIC_VALUE, + metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue() ); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 17d8676df1b..750440d2595 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -892,7 +892,7 @@ public class RecordAccumulatorTest { readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); Map> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertEquals(drained.get(node1.id()).size(), 1, "There should be only one batch."); + assertEquals(1, drained.get(node1.id()).size(), "There should be only one batch."); time.sleep(1000L); accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); @@ -1788,6 +1788,6 @@ public class RecordAccumulatorTest { } // Verify all original records are accounted for (no data loss) - assertEquals(keyFoundMap.size(), 100, "All original 100 records should be present after splitting"); + assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting"); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 6b2d50a52cc..cd984ac2a34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -524,7 +524,7 @@ public class SenderTest { // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen // as done above by throttling or with a disconnect / backoff. long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); - assertEquals(currentPollDelay, throttleTimeMs); + assertEquals(throttleTimeMs, currentPollDelay); txnManager.beginTransaction(); txnManager.maybeAddPartition(tp0); @@ -3268,7 +3268,7 @@ public class SenderTest { fail("Expected abortable error to be thrown for commit"); } catch (KafkaException e) { assertTrue(transactionManager.hasAbortableError()); - assertEquals(commitResult.error().getClass(), TransactionAbortableException.class); + assertEquals(TransactionAbortableException.class, commitResult.error().getClass()); } // Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException @@ -3287,7 +3287,7 @@ public class SenderTest { // Verify TM is in FATAL_ERROR state assertTrue(transactionManager.hasFatalError()); assertFalse(e instanceof TransactionAbortableException); - assertEquals(abortResult.error().getClass(), KafkaException.class); + assertEquals(KafkaException.class, abortResult.error().getClass()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 494c715df79..7815b751d80 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -623,9 +623,9 @@ public class TransactionManagerTest { @ValueSource(booleans = {true, false}) public void testDefaultSequenceNumber(boolean transactionV2Enabled) { initializeTransactionManager(Optional.empty(), transactionV2Enabled); - assertEquals(transactionManager.sequenceNumber(tp0), 0); + assertEquals(0, transactionManager.sequenceNumber(tp0)); transactionManager.incrementSequenceNumber(tp0, 3); - assertEquals(transactionManager.sequenceNumber(tp0), 3); + assertEquals(3, transactionManager.sequenceNumber(tp0)); } @ParameterizedTest @@ -849,13 +849,13 @@ public class TransactionManagerTest { @ValueSource(booleans = {true, false}) public void testSequenceNumberOverflow(boolean transactionV2Enabled) { initializeTransactionManager(Optional.empty(), transactionV2Enabled); - assertEquals(transactionManager.sequenceNumber(tp0), 0); + assertEquals(0, transactionManager.sequenceNumber(tp0)); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals(transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, transactionManager.sequenceNumber(tp0)); transactionManager.incrementSequenceNumber(tp0, 100); - assertEquals(transactionManager.sequenceNumber(tp0), 99); + assertEquals(99, transactionManager.sequenceNumber(tp0)); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals(transactionManager.sequenceNumber(tp0), 98); + assertEquals(98, transactionManager.sequenceNumber(tp0)); } @ParameterizedTest @@ -863,17 +863,17 @@ public class TransactionManagerTest { public void testProducerIdReset(boolean transactionV2Enabled) { initializeTransactionManager(Optional.empty(), transactionV2Enabled); initializeIdempotentProducerId(15L, Short.MAX_VALUE); - assertEquals(transactionManager.sequenceNumber(tp0), 0); - assertEquals(transactionManager.sequenceNumber(tp1), 0); + assertEquals(0, transactionManager.sequenceNumber(tp0)); + assertEquals(0, transactionManager.sequenceNumber(tp1)); transactionManager.incrementSequenceNumber(tp0, 3); - assertEquals(transactionManager.sequenceNumber(tp0), 3); + assertEquals(3, transactionManager.sequenceNumber(tp0)); transactionManager.incrementSequenceNumber(tp1, 3); - assertEquals(transactionManager.sequenceNumber(tp1), 3); + assertEquals(3, transactionManager.sequenceNumber(tp1)); transactionManager.requestIdempotentEpochBumpForPartition(tp0); transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); - assertEquals(transactionManager.sequenceNumber(tp0), 0); - assertEquals(transactionManager.sequenceNumber(tp1), 3); + assertEquals(0, transactionManager.sequenceNumber(tp0)); + assertEquals(3, transactionManager.sequenceNumber(tp1)); } @Test @@ -1101,7 +1101,7 @@ public class TransactionManagerTest { transactionManager.initializeTransactions(false); client.prepareUnsupportedVersionResponse(body -> { FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body; - assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION); + assertEquals(CoordinatorType.TRANSACTION, CoordinatorType.forId(findCoordinatorRequest.data().keyType())); assertTrue(findCoordinatorRequest.data().key().isEmpty()); assertEquals(1, findCoordinatorRequest.data().coordinatorKeys().size()); assertTrue(findCoordinatorRequest.data().coordinatorKeys().contains(transactionalId)); diff --git a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java index 1d6679e62ce..9bc6f05106e 100644 --- a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java @@ -130,7 +130,7 @@ public class SupportedVersionRangeTest { public void testEquals() { SupportedVersionRange tested = new SupportedVersionRange((short) 1, (short) 1); assertEquals(tested, tested); - assertNotEquals(tested, new SupportedVersionRange((short) 1, (short) 2)); + assertNotEquals(new SupportedVersionRange((short) 1, (short) 2), tested); assertNotEquals(null, tested); } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 4526b42e1aa..eda6648068c 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -684,8 +684,8 @@ public class MetricsTest { MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues); Map filledOutTags = inheritedMetric.tags(); - assertEquals(filledOutTags.get("parent-tag"), "parent-tag-value", "parent-tag should be set properly"); - assertEquals(filledOutTags.get("child-tag"), "child-tag-value", "child-tag should be set properly"); + assertEquals("parent-tag-value", filledOutTags.get("parent-tag"), "parent-tag should be set properly"); + assertEquals("child-tag-value", filledOutTags.get("child-tag"), "child-tag should be set properly"); assertThrows(IllegalArgumentException.class, () -> inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues), diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index 5cdcebc858d..6b806c6bb7b 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -70,12 +70,12 @@ public class SensorTest { assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id)); assertTrue(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id)); - assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()), - Sensor.RecordingLevel.DEBUG); - assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()), - Sensor.RecordingLevel.INFO); - assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()), - Sensor.RecordingLevel.TRACE); + assertEquals(Sensor.RecordingLevel.DEBUG, + Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString())); + assertEquals(Sensor.RecordingLevel.INFO, + Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString())); + assertEquals(Sensor.RecordingLevel.TRACE, + Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString())); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java index 7450b155996..e63d1949c8a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java @@ -84,9 +84,9 @@ public class UpdateFeaturesRequestTest { request = UpdateFeaturesRequest.parse(readable, UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION); List updates = new ArrayList<>(request.featureUpdates()); - assertEquals(updates.size(), 2); - assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE); - assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE); + assertEquals(2, updates.size()); + assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE, updates.get(0).upgradeType()); + assertEquals(FeatureUpdate.UpgradeType.UPGRADE, updates.get(1).upgradeType()); } @Test @@ -114,9 +114,9 @@ public class UpdateFeaturesRequestTest { request = UpdateFeaturesRequest.parse(readable, UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION); List updates = new ArrayList<>(request.featureUpdates()); - assertEquals(updates.size(), 2); - assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE); - assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE); + assertEquals(2, updates.size()); + assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE, updates.get(0).upgradeType()); + assertEquals(FeatureUpdate.UpgradeType.UPGRADE, updates.get(1).upgradeType()); } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 76c9109bd8c..710caeb150a 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -292,7 +292,7 @@ public class SaslServerAuthenticatorTest { when(saslServer.isComplete()).thenReturn(false).thenReturn(true); mockRequest(saslAuthenticateRequest(), transportLayer); - Throwable t = assertThrows(IllegalArgumentException.class, () -> authenticator.authenticate()); + Throwable t = assertThrows(IllegalArgumentException.class, authenticator::authenticate); assertEquals(ArithmeticException.class, t.getCause().getClass()); assertEquals("Cannot convert " + Long.MAX_VALUE + " millisecond to nanosecond due to arithmetic overflow", t.getMessage()); diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java index 5980a0d3b3c..31c01849bc7 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java @@ -26,12 +26,12 @@ public class KerberosRuleTest { @Test public void testReplaceParameters() throws BadFormatString { // positive test cases - assertEquals(KerberosRule.replaceParameters("", new String[0]), ""); - assertEquals(KerberosRule.replaceParameters("hello", new String[0]), "hello"); - assertEquals(KerberosRule.replaceParameters("", new String[]{"too", "many", "parameters", "are", "ok"}), ""); - assertEquals(KerberosRule.replaceParameters("hello", new String[]{"too", "many", "parameters", "are", "ok"}), "hello"); - assertEquals(KerberosRule.replaceParameters("hello $0", new String[]{"too", "many", "parameters", "are", "ok"}), "hello too"); - assertEquals(KerberosRule.replaceParameters("hello $0", new String[]{"no recursion $1"}), "hello no recursion $1"); + assertEquals("", KerberosRule.replaceParameters("", new String[0])); + assertEquals("hello", KerberosRule.replaceParameters("hello", new String[0])); + assertEquals("", KerberosRule.replaceParameters("", new String[]{"too", "many", "parameters", "are", "ok"})); + assertEquals("hello", KerberosRule.replaceParameters("hello", new String[]{"too", "many", "parameters", "are", "ok"})); + assertEquals("hello too", KerberosRule.replaceParameters("hello $0", new String[]{"too", "many", "parameters", "are", "ok"})); + assertEquals("hello no recursion $1", KerberosRule.replaceParameters("hello $0", new String[]{"no recursion $1"})); // negative test cases assertThrows( diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java index abbe2ef28f9..89e6de42c1d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java @@ -152,7 +152,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest { private static void confirmCorrectValues(OAuthBearerUnsecuredJws jws, String user, long startMs, long lifetimeSeconds) throws OAuthBearerIllegalTokenException { Map header = jws.header(); - assertEquals(header.size(), 1); + assertEquals(1, header.size()); assertEquals("none", header.get("alg")); assertEquals(user != null ? user : "", jws.principalName()); assertEquals(Long.valueOf(startMs), jws.startTimeMs()); diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index c06e853b073..935c02dbf83 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -533,7 +533,7 @@ public class ClientTelemetryReporterTest { assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state()); - assertThrows(KafkaException.class, () -> telemetrySender.createRequest()); + assertThrows(KafkaException.class, telemetrySender::createRequest); assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); // === Test 3: After termination, no more requests === diff --git a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java index 06d5a4e93eb..5a6d8b291b0 100644 --- a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java +++ b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java @@ -38,8 +38,8 @@ public class AlterConfigPolicyTest { assertEquals(requestMetadata, requestMetadata); - assertNotEquals(requestMetadata, null); - assertNotEquals(requestMetadata, new Object()); + assertNotEquals(null, requestMetadata); + assertNotEquals(new Object(), requestMetadata); assertNotEquals(requestMetadata, new RequestMetadata( new ConfigResource(Type.BROKER, "1"), Collections.singletonMap("foo", "bar") diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 47a26aa697d..889ebcbc607 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -444,14 +444,19 @@ public class TestSslUtils { SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); BcContentSignerBuilder signerBuilder; String keyAlgorithm = keyPair.getPublic().getAlgorithm(); - if (keyAlgorithm.equals("RSA")) - signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); - else if (keyAlgorithm.equals("DSA")) - signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); - else if (keyAlgorithm.equals("EC")) - signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); - else - throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + switch (keyAlgorithm) { + case "RSA": + signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); + break; + case "DSA": + signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); + break; + case "EC": + signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); + break; + default: + throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + } ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam); // Negative numbers for "days" can be used to generate expired certificates Date now = new Date(); @@ -520,14 +525,19 @@ public class TestSslUtils { SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); BcContentSignerBuilder signerBuilder; String keyAlgorithm = keyPair.getPublic().getAlgorithm(); - if (keyAlgorithm.equals("RSA")) - signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); - else if (keyAlgorithm.equals("DSA")) - signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); - else if (keyAlgorithm.equals("EC")) - signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); - else - throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + switch (keyAlgorithm) { + case "RSA": + signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); + break; + case "DSA": + signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); + break; + case "EC": + signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); + break; + default: + throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + } ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam); // Negative numbers for "days" can be used to generate expired certificates Date now = new Date();