mirror of https://github.com/apache/kafka.git
KAFKA-19733 Fix arguments to assertEquals() in clients module (#20586)
The given PR mostly fixes the order of arguments in `assertEquals()` for the Clients module. Some minor cleanups were included with the same too. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
14917ae727
commit
97c8c6b595
|
@ -204,10 +204,10 @@ public class ProducerIdExpirationTest {
|
|||
// Update the producer ID expiration ms to a very high value.
|
||||
admin.incrementalAlterConfigs(producerIdExpirationConfig("100000"));
|
||||
|
||||
cluster.brokers().values().forEach(broker -> {
|
||||
cluster.brokers().values().forEach(broker ->
|
||||
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000,
|
||||
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
|
||||
});
|
||||
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100)
|
||||
);
|
||||
// Send more records to send producer ID back to brokers.
|
||||
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
|
||||
producer.flush();
|
||||
|
@ -226,10 +226,10 @@ public class ProducerIdExpirationTest {
|
|||
kafkaBroker.awaitShutdown();
|
||||
kafkaBroker.startup();
|
||||
cluster.waitForReadyBrokers();
|
||||
cluster.brokers().values().forEach(broker -> {
|
||||
cluster.brokers().values().forEach(broker ->
|
||||
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100,
|
||||
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
|
||||
});
|
||||
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100)
|
||||
);
|
||||
|
||||
// Ensure producer ID expires quickly again.
|
||||
waitProducerIdExpire(admin);
|
||||
|
|
|
@ -184,9 +184,8 @@ public class ProducerSendWhileDeletionTest {
|
|||
try (var producer = createProducer()) {
|
||||
for (int i = 1; i <= numRecords; i++) {
|
||||
producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()),
|
||||
(metadata, exception) -> {
|
||||
numAcks.incrementAndGet();
|
||||
});
|
||||
(metadata, exception) -> numAcks.incrementAndGet()
|
||||
);
|
||||
}
|
||||
producer.flush();
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ public class ClusterConnectionStatesTest {
|
|||
|
||||
connectionStates.authenticationFailed(nodeId1, time.milliseconds(), new AuthenticationException("No path to CA for certificate!"));
|
||||
time.sleep(1000);
|
||||
assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.AUTHENTICATION_FAILED);
|
||||
assertEquals(ConnectionState.AUTHENTICATION_FAILED, connectionStates.connectionState(nodeId1));
|
||||
assertNotNull(connectionStates.authenticationException(nodeId1));
|
||||
assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
|
||||
assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
|
||||
|
@ -210,7 +210,7 @@ public class ClusterConnectionStatesTest {
|
|||
connectionStates.remove(nodeId1);
|
||||
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
|
||||
assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
|
||||
assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), 0L);
|
||||
assertEquals(0L, connectionStates.connectionDelay(nodeId1, time.milliseconds()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -361,28 +361,28 @@ public class MetadataTest {
|
|||
// Metadata with newer epoch is handled
|
||||
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue()));
|
||||
|
||||
// Don't update to an older one
|
||||
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue()));
|
||||
|
||||
// Don't cause update if it's the same one
|
||||
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(10, leaderAndEpoch.intValue()));
|
||||
|
||||
// Update if we see newer epoch
|
||||
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue()));
|
||||
|
||||
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12);
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue()));
|
||||
|
||||
// Don't overwrite metadata with older epoch
|
||||
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11);
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(12, leaderAndEpoch.intValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -465,7 +465,7 @@ public class MetadataTest {
|
|||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
|
||||
assertNotNull(metadata.fetch().partition(tp));
|
||||
assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
|
||||
// Fake an empty ISR, but with an older epoch, should reject it
|
||||
|
@ -475,8 +475,8 @@ public class MetadataTest {
|
|||
new MetadataResponse.PartitionMetadata(error, partition, leader,
|
||||
leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
|
||||
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1);
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(1, metadata.fetch().partition(tp).inSyncReplicas().length);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
|
||||
// Fake an empty ISR, with same epoch, accept it
|
||||
|
@ -486,8 +486,8 @@ public class MetadataTest {
|
|||
new MetadataResponse.PartitionMetadata(error, partition, leader,
|
||||
leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
|
||||
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0);
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(0, metadata.fetch().partition(tp).inSyncReplicas().length);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
|
||||
// Empty metadata response, should not keep old partition but should keep the last-seen epoch
|
||||
|
@ -495,7 +495,7 @@ public class MetadataTest {
|
|||
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.emptyMap());
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
|
||||
assertNull(metadata.fetch().partition(tp));
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
|
||||
// Back in the metadata, with old epoch, should not get added
|
||||
|
@ -503,7 +503,7 @@ public class MetadataTest {
|
|||
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99);
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
|
||||
assertNull(metadata.fetch().partition(tp));
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -522,31 +522,31 @@ public class MetadataTest {
|
|||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
|
||||
assertNotNull(metadata.fetch().partition(tp));
|
||||
assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
|
||||
assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
|
||||
// Simulate a leader epoch from another response, like a fetch response or list offsets
|
||||
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
|
||||
|
||||
// Cache of partition stays, but current partition info is not available since it's stale
|
||||
assertNotNull(metadata.fetch().partition(tp));
|
||||
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
|
||||
assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
|
||||
assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
|
||||
assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
|
||||
// Metadata with older epoch is rejected, metadata state is unchanged
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
|
||||
assertNotNull(metadata.fetch().partition(tp));
|
||||
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
|
||||
assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
|
||||
assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
|
||||
assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
|
||||
// Metadata with equal or newer epoch is accepted
|
||||
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 101);
|
||||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 30L);
|
||||
assertNotNull(metadata.fetch().partition(tp));
|
||||
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(), 5);
|
||||
assertEquals(5, Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
|
||||
assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
|
||||
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
|
||||
assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -585,18 +585,18 @@ public class MetadataTest {
|
|||
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
|
||||
|
||||
Cluster cluster = metadata.fetch();
|
||||
assertEquals(cluster.clusterResource().clusterId(), "dummy");
|
||||
assertEquals(cluster.nodes().size(), 4);
|
||||
assertEquals("dummy", cluster.clusterResource().clusterId());
|
||||
assertEquals(4, cluster.nodes().size());
|
||||
|
||||
// topic counts
|
||||
assertEquals(cluster.invalidTopics(), Collections.singleton("topic3"));
|
||||
assertEquals(cluster.unauthorizedTopics(), Collections.singleton("topic4"));
|
||||
assertEquals(cluster.topics().size(), 3);
|
||||
assertEquals(3, cluster.topics().size());
|
||||
assertEquals(cluster.internalTopics(), Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME));
|
||||
|
||||
// partition counts
|
||||
assertEquals(cluster.partitionsForTopic("topic1").size(), 2);
|
||||
assertEquals(cluster.partitionsForTopic("topic2").size(), 3);
|
||||
assertEquals(2, cluster.partitionsForTopic("topic1").size());
|
||||
assertEquals(3, cluster.partitionsForTopic("topic2").size());
|
||||
|
||||
// Sentinel instances
|
||||
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
|
||||
|
@ -798,10 +798,10 @@ public class MetadataTest {
|
|||
|
||||
TopicPartition tp = new TopicPartition("topic-1", 0);
|
||||
|
||||
assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(node.id(), 0));
|
||||
assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(0, node.id()));
|
||||
assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent());
|
||||
assertEquals(metadata.fetch().nodeById(0).id(), 0);
|
||||
assertEquals(metadata.fetch().nodeById(1).id(), 1);
|
||||
assertEquals(0, metadata.fetch().nodeById(0).id());
|
||||
assertEquals(1, metadata.fetch().nodeById(1).id());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -831,7 +831,7 @@ public class MetadataTest {
|
|||
|
||||
TopicPartition tp = new TopicPartition("topic-1", 0);
|
||||
|
||||
assertEquals(metadata.fetch().nodeById(0).id(), 0);
|
||||
assertEquals(0, metadata.fetch().nodeById(0).id());
|
||||
assertNull(metadata.fetch().partition(tp));
|
||||
assertEquals(metadata.fetch().nodeIfOnline(tp, 0), Optional.empty());
|
||||
}
|
||||
|
@ -955,13 +955,13 @@ public class MetadataTest {
|
|||
// Update the metadata to add a new topic variant, "new", which will be retained with "keep". Note this
|
||||
// means that all of the "old" topics should be dropped.
|
||||
Cluster cluster = metadata.fetch();
|
||||
assertEquals(cluster.clusterResource().clusterId(), oldClusterId);
|
||||
assertEquals(cluster.nodes().size(), oldNodes);
|
||||
assertEquals(oldClusterId, cluster.clusterResource().clusterId());
|
||||
assertEquals(oldNodes, cluster.nodes().size());
|
||||
assertEquals(cluster.invalidTopics(), Set.of("oldInvalidTopic", "keepInvalidTopic"));
|
||||
assertEquals(cluster.unauthorizedTopics(), Set.of("oldUnauthorizedTopic", "keepUnauthorizedTopic"));
|
||||
assertEquals(cluster.topics(), Set.of("oldValidTopic", "keepValidTopic"));
|
||||
assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
|
||||
assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
|
||||
assertEquals(2, cluster.partitionsForTopic("oldValidTopic").size());
|
||||
assertEquals(3, cluster.partitionsForTopic("keepValidTopic").size());
|
||||
assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values()));
|
||||
|
||||
String newClusterId = "newClusterId";
|
||||
|
@ -990,13 +990,13 @@ public class MetadataTest {
|
|||
assertNull(metadataTopicIds2.get("oldValidTopic"));
|
||||
|
||||
cluster = metadata.fetch();
|
||||
assertEquals(cluster.clusterResource().clusterId(), newClusterId);
|
||||
assertEquals(newClusterId, cluster.clusterResource().clusterId());
|
||||
assertEquals(cluster.nodes().size(), newNodes);
|
||||
assertEquals(cluster.invalidTopics(), Set.of("keepInvalidTopic", "newInvalidTopic"));
|
||||
assertEquals(cluster.unauthorizedTopics(), Set.of("keepUnauthorizedTopic", "newUnauthorizedTopic"));
|
||||
assertEquals(cluster.topics(), Set.of("keepValidTopic", "newValidTopic"));
|
||||
assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
|
||||
assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
|
||||
assertEquals(2, cluster.partitionsForTopic("keepValidTopic").size());
|
||||
assertEquals(4, cluster.partitionsForTopic("newValidTopic").size());
|
||||
assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values()));
|
||||
|
||||
// Perform another metadata update, but this time all topic metadata should be cleared.
|
||||
|
@ -1008,7 +1008,7 @@ public class MetadataTest {
|
|||
topicIds.forEach((topicName, topicId) -> assertNull(metadataTopicIds3.get(topicName)));
|
||||
|
||||
cluster = metadata.fetch();
|
||||
assertEquals(cluster.clusterResource().clusterId(), newClusterId);
|
||||
assertEquals(newClusterId, cluster.clusterResource().clusterId());
|
||||
assertEquals(cluster.nodes().size(), newNodes);
|
||||
assertEquals(cluster.invalidTopics(), Collections.emptySet());
|
||||
assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
|
||||
|
|
|
@ -64,7 +64,7 @@ public class ConfigTest {
|
|||
assertEquals(config, config);
|
||||
assertEquals(config, new Config(config.entries()));
|
||||
assertNotEquals(new Config(Collections.singletonList(E1)), config);
|
||||
assertNotEquals(config, "this");
|
||||
assertNotEquals("this", config);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -81,7 +81,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
|
|||
|
||||
Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
|
||||
assertTrue(encodedGeneration.isPresent());
|
||||
assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
|
||||
assertEquals(DEFAULT_GENERATION, encodedGeneration.get());
|
||||
|
||||
int generation = 10;
|
||||
assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
|
||||
|
@ -90,7 +90,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
|
|||
encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
|
||||
|
||||
assertTrue(encodedGeneration.isPresent());
|
||||
assertEquals(encodedGeneration.get(), generation);
|
||||
assertEquals(generation, encodedGeneration.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -209,9 +209,7 @@ public class MockConsumerTest {
|
|||
consumer.assign(Collections.singleton(partition));
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
|
||||
|
||||
IntStream.range(0, 10).forEach(offset -> {
|
||||
consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null, null));
|
||||
});
|
||||
IntStream.range(0, 10).forEach(offset -> consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null, null)));
|
||||
|
||||
consumer.setMaxPollRecords(2L);
|
||||
|
||||
|
|
|
@ -1025,7 +1025,7 @@ public abstract class AbstractStickyAssignorTest {
|
|||
|
||||
Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
|
||||
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
|
||||
assertEquals(assignment.values().stream().mapToInt(List::size).sum(), 1 + 100);
|
||||
assertEquals(1 + 100, assignment.values().stream().mapToInt(List::size).sum());
|
||||
assertEquals(Collections.singleton(consumerId), assignment.keySet());
|
||||
assertTrue(isFullyBalanced(assignment));
|
||||
}
|
||||
|
@ -1043,7 +1043,7 @@ public abstract class AbstractStickyAssignorTest {
|
|||
|
||||
assignment = assignor.assign(Collections.emptyMap(), subscriptions);
|
||||
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
|
||||
assertEquals(assignment.size(), 1);
|
||||
assertEquals(1, assignment.size());
|
||||
assertTrue(assignment.get(consumerId).isEmpty());
|
||||
}
|
||||
|
||||
|
|
|
@ -568,13 +568,13 @@ public abstract class ConsumerCoordinatorTest {
|
|||
assertFalse(client.hasInFlightRequests());
|
||||
|
||||
// should try to find coordinator since we are commit async
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> {
|
||||
fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception);
|
||||
});
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) ->
|
||||
fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception)
|
||||
);
|
||||
coordinator.poll(time.timer(0));
|
||||
assertTrue(coordinator.coordinatorUnknown());
|
||||
assertTrue(client.hasInFlightRequests());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
|
||||
client.respond(groupCoordinatorResponse(node, Errors.NONE));
|
||||
coordinator.poll(time.timer(0));
|
||||
|
@ -582,7 +582,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
// after we've discovered the coordinator we should send
|
||||
// out the commit request immediately
|
||||
assertTrue(client.hasInFlightRequests());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
|
||||
assertEquals(1, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -619,13 +619,13 @@ public abstract class ConsumerCoordinatorTest {
|
|||
|
||||
assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail");
|
||||
assertFalse(committed.get());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
|
||||
assertEquals(1, coordinator.inFlightAsyncCommits.get());
|
||||
|
||||
prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE);
|
||||
|
||||
assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed");
|
||||
assertTrue(committed.get(), "expected commit callback to be invoked");
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -646,13 +646,13 @@ public abstract class ConsumerCoordinatorTest {
|
|||
"Unexpected exception cause type: " + (cause == null ? null : cause.getClass()));
|
||||
});
|
||||
}
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests);
|
||||
assertEquals(numRequests, coordinator.inFlightAsyncCommits.get());
|
||||
|
||||
coordinator.markCoordinatorUnknown("test cause");
|
||||
consumerClient.pollNoWakeup();
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
assertEquals(numRequests, responses.get());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -697,7 +697,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
coordinator.markCoordinatorUnknown("test cause");
|
||||
consumerClient.pollNoWakeup();
|
||||
assertTrue(asyncCallbackInvoked.get());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2350,7 +2350,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
MockCommitCallback secondCommitCallback = new MockCommitCallback();
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 2);
|
||||
assertEquals(2, coordinator.inFlightAsyncCommits.get());
|
||||
|
||||
respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
|
||||
consumerClient.pollNoWakeup();
|
||||
|
@ -2360,7 +2360,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
assertTrue(coordinator.coordinatorUnknown());
|
||||
assertInstanceOf(RetriableCommitFailedException.class, firstCommitCallback.exception);
|
||||
assertInstanceOf(RetriableCommitFailedException.class, secondCommitCallback.exception);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2549,7 +2549,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
|
||||
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
|
||||
assertNull(mockOffsetCommitCallback.exception);
|
||||
|
@ -2580,7 +2580,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
assertTrue(success.get());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2590,7 +2590,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
|
||||
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
|
||||
assertInstanceOf(RetriableCommitFailedException.class, mockOffsetCommitCallback.exception);
|
||||
|
@ -2605,7 +2605,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
MockCommitCallback cb = new MockCommitCallback();
|
||||
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
|
||||
assertTrue(coordinator.coordinatorUnknown());
|
||||
|
@ -2622,7 +2622,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
MockCommitCallback cb = new MockCommitCallback();
|
||||
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
|
||||
assertTrue(coordinator.coordinatorUnknown());
|
||||
|
@ -2639,7 +2639,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
MockCommitCallback cb = new MockCommitCallback();
|
||||
prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
|
||||
assertTrue(coordinator.coordinatorUnknown());
|
||||
|
@ -2703,7 +2703,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
}
|
||||
};
|
||||
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
|
||||
assertEquals(1, coordinator.inFlightAsyncCommits.get());
|
||||
thread.start();
|
||||
|
||||
client.waitForRequests(2, 5000);
|
||||
|
@ -2711,7 +2711,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE);
|
||||
|
||||
thread.join();
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
|
||||
assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
|
||||
}
|
||||
|
@ -3100,7 +3100,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
|
||||
assertFalse(subscriptions.hasAllFetchPositions());
|
||||
assertTrue(subscriptions.awaitingValidation(t1p));
|
||||
assertEquals(subscriptions.position(t1p).offset, 100L);
|
||||
assertEquals(100L, subscriptions.position(t1p).offset);
|
||||
assertNull(subscriptions.validPosition(t1p));
|
||||
}
|
||||
|
||||
|
@ -3470,7 +3470,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
|
||||
assertThrows(FencedInstanceIdException.class, () ->
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()));
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
assertThrows(FencedInstanceIdException.class, () ->
|
||||
coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
|
||||
}
|
||||
|
@ -3739,7 +3739,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
|
||||
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
|
||||
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
|
||||
assertEquals(0, coordinator.inFlightAsyncCommits.get());
|
||||
coordinator.invokeCompletedOffsetCommitCallbacks();
|
||||
}
|
||||
|
||||
|
|
|
@ -1815,7 +1815,7 @@ public class FetchRequestManagerTest {
|
|||
assertEquals(1, oorExceptions.size());
|
||||
OffsetOutOfRangeException oor = oorExceptions.get(0);
|
||||
assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
|
||||
assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
|
||||
assertEquals(1, oor.offsetOutOfRangePartitions().size());
|
||||
|
||||
fetchRecordsInto(fetchedRecords);
|
||||
|
||||
|
@ -2359,7 +2359,7 @@ public class FetchRequestManagerTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2477,7 +2477,7 @@ public class FetchRequestManagerTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0);
|
||||
Set<String> expectedCommittedKeys = Set.of("commit1-1", "commit1-2");
|
||||
Set<String> actuallyCommittedKeys = new HashSet<>();
|
||||
|
@ -2854,7 +2854,7 @@ public class FetchRequestManagerTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
}
|
||||
|
||||
private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
|
||||
|
@ -2939,8 +2939,8 @@ public class FetchRequestManagerTest {
|
|||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
|
||||
assertTrue(partitionRecords.containsKey(tp0));
|
||||
|
||||
assertEquals(subscriptions.position(tp0).offset, 3L);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1));
|
||||
assertEquals(3L, subscriptions.position(tp0).offset);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3110,7 +3110,7 @@ public class FetchRequestManagerTest {
|
|||
fetchRecords();
|
||||
|
||||
Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
|
||||
assertEquals(selected.id(), 1);
|
||||
assertEquals(1, selected.id());
|
||||
|
||||
assertEquals(1, sendFetches());
|
||||
assertFalse(fetcher.hasCompletedFetches());
|
||||
|
@ -3124,7 +3124,7 @@ public class FetchRequestManagerTest {
|
|||
fetchRecords();
|
||||
|
||||
selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
|
||||
assertEquals(selected.id(), -1);
|
||||
assertEquals(-1, selected.id());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1802,7 +1802,7 @@ public class FetcherTest {
|
|||
assertEquals(1, oorExceptions.size());
|
||||
OffsetOutOfRangeException oor = oorExceptions.get(0);
|
||||
assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
|
||||
assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
|
||||
assertEquals(1, oor.offsetOutOfRangePartitions().size());
|
||||
|
||||
fetchRecordsInto(fetchedRecords);
|
||||
|
||||
|
@ -2346,7 +2346,7 @@ public class FetcherTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2464,7 +2464,7 @@ public class FetcherTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0);
|
||||
Set<String> expectedCommittedKeys = Set.of("commit1-1", "commit1-2");
|
||||
Set<String> actuallyCommittedKeys = new HashSet<>();
|
||||
|
@ -3054,7 +3054,7 @@ public class FetcherTest {
|
|||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords();
|
||||
assertTrue(fetchedRecords.containsKey(tp0));
|
||||
assertEquals(fetchedRecords.get(tp0).size(), 2);
|
||||
assertEquals(2, fetchedRecords.get(tp0).size());
|
||||
}
|
||||
|
||||
private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
|
||||
|
@ -3139,8 +3139,8 @@ public class FetcherTest {
|
|||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
|
||||
assertTrue(partitionRecords.containsKey(tp0));
|
||||
|
||||
assertEquals(subscriptions.position(tp0).offset, 3L);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1));
|
||||
assertEquals(3L, subscriptions.position(tp0).offset);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3217,8 +3217,8 @@ public class FetcherTest {
|
|||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
|
||||
assertTrue(partitionRecords.containsKey(tp0));
|
||||
|
||||
assertEquals(subscriptions.position(tp0).offset, 3L);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1));
|
||||
assertEquals(3L, subscriptions.position(tp0).offset);
|
||||
assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(1, value.intValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3388,7 +3388,7 @@ public class FetcherTest {
|
|||
fetchRecords();
|
||||
|
||||
Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
|
||||
assertEquals(selected.id(), 1);
|
||||
assertEquals(1, selected.id());
|
||||
|
||||
assertEquals(1, sendFetches());
|
||||
assertFalse(fetcher.hasCompletedFetches());
|
||||
|
@ -3402,7 +3402,7 @@ public class FetcherTest {
|
|||
fetchRecords();
|
||||
|
||||
selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
|
||||
assertEquals(selected.id(), -1);
|
||||
assertEquals(-1, selected.id());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -99,8 +99,8 @@ class KafkaConsumerMetricsTest {
|
|||
|
||||
private void assertMetricValue(final String name) {
|
||||
assertEquals(
|
||||
metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue(),
|
||||
(double) METRIC_VALUE
|
||||
(double) METRIC_VALUE,
|
||||
metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -246,7 +246,7 @@ public class OffsetFetcherTest {
|
|||
assertTrue(subscriptions.hasValidPosition(tp0));
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
|
||||
assertTrue(subscriptions.isFetchable(tp0));
|
||||
assertEquals(subscriptions.position(tp0).offset, 5L);
|
||||
assertEquals(5L, subscriptions.position(tp0).offset);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -395,7 +395,7 @@ public class OffsetFetcherTest {
|
|||
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
|
||||
assertTrue(metadata.updateRequested());
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long) epoch, 2));
|
||||
assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals(2, (long) epoch));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -902,7 +902,7 @@ public class OffsetFetcherTest {
|
|||
ListOffsetsRequest offsetRequest = (ListOffsetsRequest) body;
|
||||
int epoch = offsetRequest.topics().get(0).partitions().get(0).currentLeaderEpoch();
|
||||
assertTrue(epoch != ListOffsetsResponse.UNKNOWN_EPOCH, "Expected Fetcher to set leader epoch in request");
|
||||
assertEquals(epoch, 99, "Expected leader epoch to match epoch from metadata update");
|
||||
assertEquals(99, epoch, "Expected leader epoch to match epoch from metadata update");
|
||||
return true;
|
||||
} else {
|
||||
fail("Should have seen ListOffsetRequest");
|
||||
|
|
|
@ -102,8 +102,8 @@ public class OffsetForLeaderEpochClientTest {
|
|||
assertTrue(result.partitionsToRetry().isEmpty());
|
||||
assertTrue(result.endOffsets().containsKey(tp0));
|
||||
assertEquals(result.endOffsets().get(tp0).errorCode(), Errors.NONE.code());
|
||||
assertEquals(result.endOffsets().get(tp0).leaderEpoch(), 1);
|
||||
assertEquals(result.endOffsets().get(tp0).endOffset(), 10L);
|
||||
assertEquals(1, result.endOffsets().get(tp0).leaderEpoch());
|
||||
assertEquals(10L, result.endOffsets().get(tp0).endOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -121,7 +121,7 @@ public class OffsetForLeaderEpochClientTest {
|
|||
consumerClient.pollNoWakeup();
|
||||
|
||||
assertTrue(future.failed());
|
||||
assertEquals(future.exception().getClass(), TopicAuthorizationException.class);
|
||||
assertEquals(TopicAuthorizationException.class, future.exception().getClass());
|
||||
assertTrue(((TopicAuthorizationException) future.exception()).unauthorizedTopics().contains(tp0.topic()));
|
||||
}
|
||||
|
||||
|
|
|
@ -439,14 +439,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
|
||||
});
|
||||
(mock, context) -> when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs));
|
||||
final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(
|
||||
Timer.class,
|
||||
(mock, context) -> {
|
||||
when(mock.isExpired()).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.isExpired()).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -473,14 +469,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
|
||||
});
|
||||
(mock, context) -> when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs));
|
||||
final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(
|
||||
Timer.class,
|
||||
(mock, context) -> {
|
||||
when(mock.isExpired()).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.isExpired()).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -508,14 +500,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(
|
||||
Timer.class,
|
||||
(mock, context) -> {
|
||||
when(mock.isExpired()).thenReturn(true);
|
||||
})
|
||||
(mock, context) -> when(mock.isExpired()).thenReturn(true))
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
|
@ -551,9 +539,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
})
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true))
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
|
@ -1001,9 +987,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1032,9 +1016,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1073,9 +1055,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1111,9 +1091,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1145,9 +1123,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1173,9 +1149,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
|
@ -1212,9 +1186,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
|
@ -1261,9 +1233,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
|
@ -1312,9 +1282,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
|
@ -1343,9 +1311,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
(mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
|
@ -1424,14 +1390,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
@Test
|
||||
public void testMaximumTimeToWaitPollTimerExpired() {
|
||||
try (
|
||||
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
|
||||
when(mock.isExpired()).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<Timer> timerMockedConstruction =
|
||||
mockConstruction(Timer.class, (mock, context) -> when(mock.isExpired()).thenReturn(true));
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.requestInFlight()).thenReturn(false);
|
||||
})
|
||||
(mock, context) -> when(mock.requestInFlight()).thenReturn(false))
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
|
||||
|
@ -1450,9 +1413,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class);
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.requestInFlight()).thenReturn(false);
|
||||
})
|
||||
(mock, context) -> when(mock.requestInFlight()).thenReturn(false))
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
|
||||
|
@ -1473,9 +1434,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
final long remainingMs = 12L;
|
||||
final long timeToNextHeartbeatMs = 6L;
|
||||
try (
|
||||
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
|
||||
when(mock.remainingMs()).thenReturn(remainingMs);
|
||||
});
|
||||
final MockedConstruction<Timer> timerMockedConstruction =
|
||||
mockConstruction(Timer.class, (mock, context) -> when(mock.remainingMs()).thenReturn(remainingMs));
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
|
@ -1500,14 +1460,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs,
|
||||
final long timeToNextHeartbeatMs) {
|
||||
try (
|
||||
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
|
||||
when(mock.remainingMs()).thenReturn(remainingMs);
|
||||
});
|
||||
final MockedConstruction<Timer> timerMockedConstruction =
|
||||
mockConstruction(Timer.class, (mock, context) -> when(mock.remainingMs()).thenReturn(remainingMs));
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
|
||||
})
|
||||
(mock, context) -> when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs))
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
|
||||
|
|
|
@ -62,7 +62,7 @@ public class StreamsRebalanceDataTest {
|
|||
public void testTaskIdCompareTo() {
|
||||
final StreamsRebalanceData.TaskId task = new StreamsRebalanceData.TaskId("subtopologyId1", 1);
|
||||
|
||||
assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())) == 0);
|
||||
assertEquals(0, task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())));
|
||||
assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId())) < 0);
|
||||
assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1)) < 0);
|
||||
assertTrue(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId()).compareTo(task) > 0);
|
||||
|
|
|
@ -113,13 +113,13 @@ public class AsyncConsumerMetricsTest {
|
|||
|
||||
// Then:
|
||||
assertEquals(
|
||||
(double) 10,
|
||||
metrics.metric(
|
||||
metrics.metricName(
|
||||
"application-event-queue-size",
|
||||
groupName
|
||||
)
|
||||
).metricValue(),
|
||||
(double) 10
|
||||
).metricValue()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -156,13 +156,13 @@ public class AsyncConsumerMetricsTest {
|
|||
|
||||
// Then:
|
||||
assertEquals(
|
||||
(double) 10,
|
||||
metrics.metric(
|
||||
metrics.metricName(
|
||||
"unsent-requests-queue-size",
|
||||
groupName
|
||||
)
|
||||
).metricValue(),
|
||||
(double) 10
|
||||
).metricValue()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -187,13 +187,13 @@ public class AsyncConsumerMetricsTest {
|
|||
|
||||
// Then:
|
||||
assertEquals(
|
||||
(double) 10,
|
||||
metrics.metric(
|
||||
metrics.metricName(
|
||||
"background-event-queue-size",
|
||||
groupName
|
||||
)
|
||||
).metricValue(),
|
||||
(double) 10
|
||||
).metricValue()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -223,13 +223,13 @@ public class AsyncConsumerMetricsTest {
|
|||
|
||||
private void assertMetricValue(final String name, final String groupName) {
|
||||
assertEquals(
|
||||
(double) METRIC_VALUE,
|
||||
metrics.metric(
|
||||
metrics.metricName(
|
||||
name,
|
||||
groupName
|
||||
)
|
||||
).metricValue(),
|
||||
(double) METRIC_VALUE
|
||||
).metricValue()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -724,10 +724,10 @@ public class MockProducerTest {
|
|||
buildMockProducer(false);
|
||||
Future<RecordMetadata> metadata = producer.send(record2, (md, exception) -> {
|
||||
assertNotNull(md);
|
||||
assertEquals(md.offset(), -1L, "Invalid offset");
|
||||
assertEquals(md.timestamp(), RecordBatch.NO_TIMESTAMP, "Invalid timestamp");
|
||||
assertEquals(md.serializedKeySize(), -1L, "Invalid Serialized Key size");
|
||||
assertEquals(md.serializedValueSize(), -1L, "Invalid Serialized value size");
|
||||
assertEquals(-1L, md.offset(), "Invalid offset");
|
||||
assertEquals(RecordBatch.NO_TIMESTAMP, md.timestamp(), "Invalid timestamp");
|
||||
assertEquals(-1L, md.serializedKeySize(), "Invalid Serialized Key size");
|
||||
assertEquals(-1L, md.serializedValueSize(), "Invalid Serialized value size");
|
||||
});
|
||||
IllegalArgumentException e = new IllegalArgumentException("dummy exception");
|
||||
assertTrue(producer.errorNext(e), "Complete the second request with an error");
|
||||
|
|
|
@ -219,7 +219,7 @@ public class BufferPoolTest {
|
|||
t1.join();
|
||||
t2.join();
|
||||
// both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty
|
||||
assertEquals(pool.queued(), 0);
|
||||
assertEquals(0, pool.queued());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -332,7 +332,7 @@ public class BufferPoolTest {
|
|||
|
||||
}
|
||||
|
||||
assertEquals(bufferPool.availableMemory(), 1024);
|
||||
assertEquals(1024, bufferPool.availableMemory());
|
||||
}
|
||||
|
||||
public static class StressTestThread extends Thread {
|
||||
|
|
|
@ -121,8 +121,8 @@ class KafkaProducerMetricsTest {
|
|||
|
||||
private void assertMetricValue(final String name) {
|
||||
assertEquals(
|
||||
metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue(),
|
||||
(double) METRIC_VALUE
|
||||
(double) METRIC_VALUE,
|
||||
metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -892,7 +892,7 @@ public class RecordAccumulatorTest {
|
|||
readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
|
||||
assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
|
||||
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds());
|
||||
assertEquals(drained.get(node1.id()).size(), 1, "There should be only one batch.");
|
||||
assertEquals(1, drained.get(node1.id()).size(), "There should be only one batch.");
|
||||
time.sleep(1000L);
|
||||
accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
|
||||
|
||||
|
@ -1788,6 +1788,6 @@ public class RecordAccumulatorTest {
|
|||
}
|
||||
|
||||
// Verify all original records are accounted for (no data loss)
|
||||
assertEquals(keyFoundMap.size(), 100, "All original 100 records should be present after splitting");
|
||||
assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -524,7 +524,7 @@ public class SenderTest {
|
|||
// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
|
||||
// as done above by throttling or with a disconnect / backoff.
|
||||
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
|
||||
assertEquals(currentPollDelay, throttleTimeMs);
|
||||
assertEquals(throttleTimeMs, currentPollDelay);
|
||||
|
||||
txnManager.beginTransaction();
|
||||
txnManager.maybeAddPartition(tp0);
|
||||
|
@ -3268,7 +3268,7 @@ public class SenderTest {
|
|||
fail("Expected abortable error to be thrown for commit");
|
||||
} catch (KafkaException e) {
|
||||
assertTrue(transactionManager.hasAbortableError());
|
||||
assertEquals(commitResult.error().getClass(), TransactionAbortableException.class);
|
||||
assertEquals(TransactionAbortableException.class, commitResult.error().getClass());
|
||||
}
|
||||
|
||||
// Abort API with TRANSACTION_ABORTABLE error should convert to Fatal error i.e. KafkaException
|
||||
|
@ -3287,7 +3287,7 @@ public class SenderTest {
|
|||
// Verify TM is in FATAL_ERROR state
|
||||
assertTrue(transactionManager.hasFatalError());
|
||||
assertFalse(e instanceof TransactionAbortableException);
|
||||
assertEquals(abortResult.error().getClass(), KafkaException.class);
|
||||
assertEquals(KafkaException.class, abortResult.error().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -623,9 +623,9 @@ public class TransactionManagerTest {
|
|||
@ValueSource(booleans = {true, false})
|
||||
public void testDefaultSequenceNumber(boolean transactionV2Enabled) {
|
||||
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 0);
|
||||
assertEquals(0, transactionManager.sequenceNumber(tp0));
|
||||
transactionManager.incrementSequenceNumber(tp0, 3);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 3);
|
||||
assertEquals(3, transactionManager.sequenceNumber(tp0));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -849,13 +849,13 @@ public class TransactionManagerTest {
|
|||
@ValueSource(booleans = {true, false})
|
||||
public void testSequenceNumberOverflow(boolean transactionV2Enabled) {
|
||||
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 0);
|
||||
assertEquals(0, transactionManager.sequenceNumber(tp0));
|
||||
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE);
|
||||
assertEquals(Integer.MAX_VALUE, transactionManager.sequenceNumber(tp0));
|
||||
transactionManager.incrementSequenceNumber(tp0, 100);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 99);
|
||||
assertEquals(99, transactionManager.sequenceNumber(tp0));
|
||||
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 98);
|
||||
assertEquals(98, transactionManager.sequenceNumber(tp0));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -863,17 +863,17 @@ public class TransactionManagerTest {
|
|||
public void testProducerIdReset(boolean transactionV2Enabled) {
|
||||
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
|
||||
initializeIdempotentProducerId(15L, Short.MAX_VALUE);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 0);
|
||||
assertEquals(transactionManager.sequenceNumber(tp1), 0);
|
||||
assertEquals(0, transactionManager.sequenceNumber(tp0));
|
||||
assertEquals(0, transactionManager.sequenceNumber(tp1));
|
||||
transactionManager.incrementSequenceNumber(tp0, 3);
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 3);
|
||||
assertEquals(3, transactionManager.sequenceNumber(tp0));
|
||||
transactionManager.incrementSequenceNumber(tp1, 3);
|
||||
assertEquals(transactionManager.sequenceNumber(tp1), 3);
|
||||
assertEquals(3, transactionManager.sequenceNumber(tp1));
|
||||
|
||||
transactionManager.requestIdempotentEpochBumpForPartition(tp0);
|
||||
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
|
||||
assertEquals(transactionManager.sequenceNumber(tp0), 0);
|
||||
assertEquals(transactionManager.sequenceNumber(tp1), 3);
|
||||
assertEquals(0, transactionManager.sequenceNumber(tp0));
|
||||
assertEquals(3, transactionManager.sequenceNumber(tp1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1101,7 +1101,7 @@ public class TransactionManagerTest {
|
|||
transactionManager.initializeTransactions(false);
|
||||
client.prepareUnsupportedVersionResponse(body -> {
|
||||
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
|
||||
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
|
||||
assertEquals(CoordinatorType.TRANSACTION, CoordinatorType.forId(findCoordinatorRequest.data().keyType()));
|
||||
assertTrue(findCoordinatorRequest.data().key().isEmpty());
|
||||
assertEquals(1, findCoordinatorRequest.data().coordinatorKeys().size());
|
||||
assertTrue(findCoordinatorRequest.data().coordinatorKeys().contains(transactionalId));
|
||||
|
|
|
@ -130,7 +130,7 @@ public class SupportedVersionRangeTest {
|
|||
public void testEquals() {
|
||||
SupportedVersionRange tested = new SupportedVersionRange((short) 1, (short) 1);
|
||||
assertEquals(tested, tested);
|
||||
assertNotEquals(tested, new SupportedVersionRange((short) 1, (short) 2));
|
||||
assertNotEquals(new SupportedVersionRange((short) 1, (short) 2), tested);
|
||||
assertNotEquals(null, tested);
|
||||
}
|
||||
|
||||
|
|
|
@ -684,8 +684,8 @@ public class MetricsTest {
|
|||
MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
|
||||
|
||||
Map<String, String> filledOutTags = inheritedMetric.tags();
|
||||
assertEquals(filledOutTags.get("parent-tag"), "parent-tag-value", "parent-tag should be set properly");
|
||||
assertEquals(filledOutTags.get("child-tag"), "child-tag-value", "child-tag should be set properly");
|
||||
assertEquals("parent-tag-value", filledOutTags.get("parent-tag"), "parent-tag should be set properly");
|
||||
assertEquals("child-tag-value", filledOutTags.get("child-tag"), "child-tag should be set properly");
|
||||
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, parentTagsWithValues),
|
||||
|
|
|
@ -70,12 +70,12 @@ public class SensorTest {
|
|||
assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
|
||||
assertTrue(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id));
|
||||
|
||||
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()),
|
||||
Sensor.RecordingLevel.DEBUG);
|
||||
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()),
|
||||
Sensor.RecordingLevel.INFO);
|
||||
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()),
|
||||
Sensor.RecordingLevel.TRACE);
|
||||
assertEquals(Sensor.RecordingLevel.DEBUG,
|
||||
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()));
|
||||
assertEquals(Sensor.RecordingLevel.INFO,
|
||||
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()));
|
||||
assertEquals(Sensor.RecordingLevel.TRACE,
|
||||
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -84,9 +84,9 @@ public class UpdateFeaturesRequestTest {
|
|||
request = UpdateFeaturesRequest.parse(readable, UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION);
|
||||
|
||||
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
|
||||
assertEquals(updates.size(), 2);
|
||||
assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
|
||||
assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
|
||||
assertEquals(2, updates.size());
|
||||
assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE, updates.get(0).upgradeType());
|
||||
assertEquals(FeatureUpdate.UpgradeType.UPGRADE, updates.get(1).upgradeType());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -114,9 +114,9 @@ public class UpdateFeaturesRequestTest {
|
|||
request = UpdateFeaturesRequest.parse(readable, UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION);
|
||||
|
||||
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
|
||||
assertEquals(updates.size(), 2);
|
||||
assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
|
||||
assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
|
||||
assertEquals(2, updates.size());
|
||||
assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE, updates.get(0).upgradeType());
|
||||
assertEquals(FeatureUpdate.UpgradeType.UPGRADE, updates.get(1).upgradeType());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -292,7 +292,7 @@ public class SaslServerAuthenticatorTest {
|
|||
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
|
||||
mockRequest(saslAuthenticateRequest(), transportLayer);
|
||||
|
||||
Throwable t = assertThrows(IllegalArgumentException.class, () -> authenticator.authenticate());
|
||||
Throwable t = assertThrows(IllegalArgumentException.class, authenticator::authenticate);
|
||||
assertEquals(ArithmeticException.class, t.getCause().getClass());
|
||||
assertEquals("Cannot convert " + Long.MAX_VALUE + " millisecond to nanosecond due to arithmetic overflow",
|
||||
t.getMessage());
|
||||
|
|
|
@ -26,12 +26,12 @@ public class KerberosRuleTest {
|
|||
@Test
|
||||
public void testReplaceParameters() throws BadFormatString {
|
||||
// positive test cases
|
||||
assertEquals(KerberosRule.replaceParameters("", new String[0]), "");
|
||||
assertEquals(KerberosRule.replaceParameters("hello", new String[0]), "hello");
|
||||
assertEquals(KerberosRule.replaceParameters("", new String[]{"too", "many", "parameters", "are", "ok"}), "");
|
||||
assertEquals(KerberosRule.replaceParameters("hello", new String[]{"too", "many", "parameters", "are", "ok"}), "hello");
|
||||
assertEquals(KerberosRule.replaceParameters("hello $0", new String[]{"too", "many", "parameters", "are", "ok"}), "hello too");
|
||||
assertEquals(KerberosRule.replaceParameters("hello $0", new String[]{"no recursion $1"}), "hello no recursion $1");
|
||||
assertEquals("", KerberosRule.replaceParameters("", new String[0]));
|
||||
assertEquals("hello", KerberosRule.replaceParameters("hello", new String[0]));
|
||||
assertEquals("", KerberosRule.replaceParameters("", new String[]{"too", "many", "parameters", "are", "ok"}));
|
||||
assertEquals("hello", KerberosRule.replaceParameters("hello", new String[]{"too", "many", "parameters", "are", "ok"}));
|
||||
assertEquals("hello too", KerberosRule.replaceParameters("hello $0", new String[]{"too", "many", "parameters", "are", "ok"}));
|
||||
assertEquals("hello no recursion $1", KerberosRule.replaceParameters("hello $0", new String[]{"no recursion $1"}));
|
||||
|
||||
// negative test cases
|
||||
assertThrows(
|
||||
|
|
|
@ -152,7 +152,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
|
|||
private static void confirmCorrectValues(OAuthBearerUnsecuredJws jws, String user, long startMs,
|
||||
long lifetimeSeconds) throws OAuthBearerIllegalTokenException {
|
||||
Map<String, Object> header = jws.header();
|
||||
assertEquals(header.size(), 1);
|
||||
assertEquals(1, header.size());
|
||||
assertEquals("none", header.get("alg"));
|
||||
assertEquals(user != null ? user : "<unknown>", jws.principalName());
|
||||
assertEquals(Long.valueOf(startMs), jws.startTimeMs());
|
||||
|
|
|
@ -533,7 +533,7 @@ public class ClientTelemetryReporterTest {
|
|||
|
||||
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
|
||||
|
||||
assertThrows(KafkaException.class, () -> telemetrySender.createRequest());
|
||||
assertThrows(KafkaException.class, telemetrySender::createRequest);
|
||||
assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state());
|
||||
|
||||
// === Test 3: After termination, no more requests ===
|
||||
|
|
|
@ -38,8 +38,8 @@ public class AlterConfigPolicyTest {
|
|||
|
||||
assertEquals(requestMetadata, requestMetadata);
|
||||
|
||||
assertNotEquals(requestMetadata, null);
|
||||
assertNotEquals(requestMetadata, new Object());
|
||||
assertNotEquals(null, requestMetadata);
|
||||
assertNotEquals(new Object(), requestMetadata);
|
||||
assertNotEquals(requestMetadata, new RequestMetadata(
|
||||
new ConfigResource(Type.BROKER, "1"),
|
||||
Collections.singletonMap("foo", "bar")
|
||||
|
|
|
@ -444,14 +444,19 @@ public class TestSslUtils {
|
|||
SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
|
||||
BcContentSignerBuilder signerBuilder;
|
||||
String keyAlgorithm = keyPair.getPublic().getAlgorithm();
|
||||
if (keyAlgorithm.equals("RSA"))
|
||||
signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else if (keyAlgorithm.equals("DSA"))
|
||||
signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else if (keyAlgorithm.equals("EC"))
|
||||
signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else
|
||||
throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm);
|
||||
switch (keyAlgorithm) {
|
||||
case "RSA":
|
||||
signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
case "DSA":
|
||||
signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
case "EC":
|
||||
signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm);
|
||||
}
|
||||
ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam);
|
||||
// Negative numbers for "days" can be used to generate expired certificates
|
||||
Date now = new Date();
|
||||
|
@ -520,14 +525,19 @@ public class TestSslUtils {
|
|||
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
|
||||
BcContentSignerBuilder signerBuilder;
|
||||
String keyAlgorithm = keyPair.getPublic().getAlgorithm();
|
||||
if (keyAlgorithm.equals("RSA"))
|
||||
signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else if (keyAlgorithm.equals("DSA"))
|
||||
signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else if (keyAlgorithm.equals("EC"))
|
||||
signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId);
|
||||
else
|
||||
throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm);
|
||||
switch (keyAlgorithm) {
|
||||
case "RSA":
|
||||
signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
case "DSA":
|
||||
signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
case "EC":
|
||||
signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm);
|
||||
}
|
||||
ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam);
|
||||
// Negative numbers for "days" can be used to generate expired certificates
|
||||
Date now = new Date();
|
||||
|
|
Loading…
Reference in New Issue