mirror of https://github.com/apache/kafka.git
MINOR: Small cleanups in clients tests (#19634)
- Removed unused fields and methods in clients tests - Fixed IDEA code inspection warnings Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang <payang@apache.org>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>
This commit is contained in:
parent
424e7251d6
commit
0810650da1
|
|
@ -165,8 +165,8 @@ public class TransactionsExpirationTest {
|
|||
|
||||
private void testTransactionAfterProducerIdExpires(ClusterInstance clusterInstance, boolean isTV2Enabled) throws InterruptedException {
|
||||
clusterInstance.createTopic(TOPIC1, 4, (short) 3);
|
||||
long oldProducerId = 0;
|
||||
long oldProducerEpoch = 0;
|
||||
long oldProducerId;
|
||||
long oldProducerEpoch;
|
||||
|
||||
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
|
||||
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ public class DescribeAuthorizedOperationsTest {
|
|||
public void testConsumerGroupAuthorizedOperations(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
|
||||
setupSecurity(clusterInstance);
|
||||
try (Admin admin = clusterInstance.admin(createAdminConfig(JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD));
|
||||
Admin user1 = clusterInstance.admin(createAdminConfig(JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD));
|
||||
Admin user1 = clusterInstance.admin(createAdminConfig(JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD))
|
||||
) {
|
||||
admin.createTopics(List.of(new NewTopic("topic1", 1, (short) 1)));
|
||||
clusterInstance.waitForTopic("topic1", 1);
|
||||
|
|
|
|||
|
|
@ -112,9 +112,7 @@ public class RackAwareAutoTopicCreationTest {
|
|||
private static Map<Integer, String> getBrokerToRackMap(ClusterInstance cluster) throws Exception {
|
||||
Map<Integer, String> actualBrokerToRackMap = new HashMap<>();
|
||||
try (Admin admin = cluster.admin()) {
|
||||
admin.describeCluster().nodes().get().forEach(node -> {
|
||||
actualBrokerToRackMap.put(node.id(), node.rack());
|
||||
});
|
||||
admin.describeCluster().nodes().get().forEach(node -> actualBrokerToRackMap.put(node.id(), node.rack()));
|
||||
}
|
||||
return actualBrokerToRackMap;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1809,8 +1809,7 @@ public class ShareConsumerTest {
|
|||
// Set the auto offset reset to 3 hours before current time
|
||||
// so the consumer should consume all messages (3 records)
|
||||
alterShareAutoOffsetReset("group2", "by_duration:PT3H");
|
||||
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group2");
|
||||
Producer<byte[], byte[]> producer = createProducer()) {
|
||||
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group2")) {
|
||||
|
||||
shareConsumer.subscribe(Set.of(tp.topic()));
|
||||
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, 3);
|
||||
|
|
@ -1820,7 +1819,7 @@ public class ShareConsumerTest {
|
|||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
|
||||
public void testShareAutoOffsetResetByDurationInvalidFormat() {
|
||||
// Test invalid duration format
|
||||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, "group1");
|
||||
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
|
||||
|
|
@ -1966,10 +1965,7 @@ public class ShareConsumerTest {
|
|||
);
|
||||
|
||||
// top the producer after some time (but after coordinator shutdown)
|
||||
service.schedule(() -> {
|
||||
prodState.done().set(true);
|
||||
}, 10L, TimeUnit.SECONDS
|
||||
);
|
||||
service.schedule(() -> prodState.done().set(true), 10L, TimeUnit.SECONDS);
|
||||
|
||||
// wait for both producer and consumer to finish
|
||||
TestUtils.waitForCondition(
|
||||
|
|
@ -2097,10 +2093,7 @@ public class ShareConsumerTest {
|
|||
);
|
||||
|
||||
// let the complex consumer read the messages
|
||||
service.schedule(() -> {
|
||||
prodState.done().set(true);
|
||||
}, 10L, TimeUnit.SECONDS
|
||||
);
|
||||
service.schedule(() -> prodState.done().set(true), 10L, TimeUnit.SECONDS);
|
||||
|
||||
// all messages which can be read are read, some would be redelivered
|
||||
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
|
||||
|
|
@ -2594,8 +2587,7 @@ public class ShareConsumerTest {
|
|||
int maxPolls,
|
||||
boolean commit) {
|
||||
return assertDoesNotThrow(() -> {
|
||||
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
|
||||
groupId)) {
|
||||
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
|
||||
shareConsumer.subscribe(Set.of(tp.topic()));
|
||||
return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
|
||||
}
|
||||
|
|
@ -2862,10 +2854,6 @@ public class ShareConsumerTest {
|
|||
this.configs.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
}
|
||||
|
||||
void stop() {
|
||||
state.done().set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (ShareConsumer<K, V> consumer = new KafkaShareConsumer<>(configs)) {
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ public class InFlightRequestsTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCompleteNextThrowsIfNoInflights() {
|
||||
public void testCompleteNextThrowsIfNoInFlights() {
|
||||
assertThrows(IllegalStateException.class, () -> inFlightRequests.completeNext(dest));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -748,24 +748,6 @@ public class NetworkClientTest {
|
|||
assertEquals(0, client.throttleDelayMs(node, time.milliseconds()));
|
||||
}
|
||||
|
||||
// Creates expected ApiVersionsResponse from the specified node, where the max protocol version for the specified
|
||||
// key is set to the specified version.
|
||||
private ApiVersionsResponse createExpectedApiVersionsResponse(ApiKeys key, short maxVersion) {
|
||||
ApiVersionCollection versionList = new ApiVersionCollection();
|
||||
for (ApiKeys apiKey : ApiKeys.values()) {
|
||||
if (apiKey == key) {
|
||||
versionList.add(new ApiVersion()
|
||||
.setApiKey(apiKey.id)
|
||||
.setMinVersion((short) 0)
|
||||
.setMaxVersion(maxVersion));
|
||||
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
|
||||
}
|
||||
return new ApiVersionsResponse(new ApiVersionsResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setThrottleTimeMs(0)
|
||||
.setApiKeys(versionList));
|
||||
}
|
||||
|
||||
private int sendEmptyProduceRequest() {
|
||||
return sendEmptyProduceRequest(client, node.idString());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ public class DeleteConsumerGroupOffsetsResultTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTopLevelErrorConstructor() throws InterruptedException {
|
||||
public void testTopLevelErrorConstructor() {
|
||||
partitionFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
|
||||
DeleteConsumerGroupOffsetsResult topLevelErrorResult =
|
||||
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
|
||||
|
|
|
|||
|
|
@ -503,7 +503,7 @@ public class KafkaAdminClientTest {
|
|||
.map(r -> (ClientTelemetryReporter) r)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertEquals(telemetryReporterList.size(), 1);
|
||||
assertEquals(1, telemetryReporterList.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2086,7 +2086,7 @@ public class KafkaAdminClientTest {
|
|||
ElectLeadersResult results = env.adminClient().electLeaders(
|
||||
electionType,
|
||||
new HashSet<>(asList(topic1, topic2)));
|
||||
assertEquals(results.partitions().get().get(topic2).get().getClass(), ClusterAuthorizationException.class);
|
||||
assertEquals(ClusterAuthorizationException.class, results.partitions().get().get(topic2).get().getClass());
|
||||
|
||||
// Test a call where there are no errors. By mutating the internal of election results
|
||||
partition1Result.setErrorCode(ApiError.NONE.error().code());
|
||||
|
|
@ -2292,7 +2292,7 @@ public class KafkaAdminClientTest {
|
|||
|
||||
private static DescribeLogDirsResponse prepareEmptyDescribeLogDirsResponse(Optional<Errors> error) {
|
||||
DescribeLogDirsResponseData data = new DescribeLogDirsResponseData();
|
||||
if (error.isPresent()) data.setErrorCode(error.get().code());
|
||||
error.ifPresent(e -> data.setErrorCode(e.code()));
|
||||
return new DescribeLogDirsResponse(data);
|
||||
}
|
||||
|
||||
|
|
@ -8389,7 +8389,7 @@ public class KafkaAdminClientTest {
|
|||
options.timeoutMs(10000);
|
||||
final KafkaFuture<FeatureMetadata> future = env.adminClient().describeFeatures(options).featureMetadata();
|
||||
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
|
||||
assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());
|
||||
assertEquals(Errors.INVALID_REQUEST.exception().getClass(), e.getCause().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -8978,15 +8978,15 @@ public class KafkaAdminClientTest {
|
|||
|
||||
DescribeClientQuotasResult result = env.adminClient().describeClientQuotas(filter);
|
||||
Map<ClientQuotaEntity, Map<String, Double>> resultData = result.entities().get();
|
||||
assertEquals(resultData.size(), 2);
|
||||
assertEquals(2, resultData.size());
|
||||
assertTrue(resultData.containsKey(entity1));
|
||||
Map<String, Double> config1 = resultData.get(entity1);
|
||||
assertEquals(config1.size(), 1);
|
||||
assertEquals(config1.get("consumer_byte_rate"), 10000.0, 1e-6);
|
||||
assertEquals(1, config1.size());
|
||||
assertEquals(10000.0, config1.get("consumer_byte_rate"), 1e-6);
|
||||
assertTrue(resultData.containsKey(entity2));
|
||||
Map<String, Double> config2 = resultData.get(entity2);
|
||||
assertEquals(config2.size(), 1);
|
||||
assertEquals(config2.get("producer_byte_rate"), 20000.0, 1e-6);
|
||||
assertEquals(1, config2.size());
|
||||
assertEquals(20000.0, config2.get("producer_byte_rate"), 1e-6);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
|
@ -460,14 +461,14 @@ public class MockAdminClient extends AdminClient {
|
|||
@Override
|
||||
public synchronized DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options) {
|
||||
if (topics instanceof TopicIdCollection)
|
||||
return DescribeTopicsResult.ofTopicIds(new HashMap<>(handleDescribeTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options)));
|
||||
return DescribeTopicsResult.ofTopicIds(new HashMap<>(handleDescribeTopicsUsingIds(((TopicIdCollection) topics).topicIds())));
|
||||
else if (topics instanceof TopicNameCollection)
|
||||
return DescribeTopicsResult.ofTopicNames(new HashMap<>(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options)));
|
||||
return DescribeTopicsResult.ofTopicNames(new HashMap<>(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames())));
|
||||
else
|
||||
throw new IllegalArgumentException("The TopicCollection provided did not match any supported classes for describeTopics.");
|
||||
}
|
||||
|
||||
private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(Collection<String> topicNames, DescribeTopicsOptions options) {
|
||||
private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(Collection<String> topicNames) {
|
||||
Map<String, KafkaFuture<TopicDescription>> topicDescriptions = new HashMap<>();
|
||||
|
||||
if (timeoutNextRequests > 0) {
|
||||
|
|
@ -507,7 +508,7 @@ public class MockAdminClient extends AdminClient {
|
|||
return topicDescriptions;
|
||||
}
|
||||
|
||||
public synchronized Map<Uuid, KafkaFuture<TopicDescription>> handleDescribeTopicsUsingIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) {
|
||||
public synchronized Map<Uuid, KafkaFuture<TopicDescription>> handleDescribeTopicsUsingIds(Collection<Uuid> topicIds) {
|
||||
|
||||
Map<Uuid, KafkaFuture<TopicDescription>> topicDescriptions = new HashMap<>();
|
||||
|
||||
|
|
@ -553,15 +554,15 @@ public class MockAdminClient extends AdminClient {
|
|||
public synchronized DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options) {
|
||||
DeleteTopicsResult result;
|
||||
if (topics instanceof TopicIdCollection)
|
||||
result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options)));
|
||||
result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds())));
|
||||
else if (topics instanceof TopicNameCollection)
|
||||
result = DeleteTopicsResult.ofTopicNames(new HashMap<>(handleDeleteTopicsUsingNames(((TopicNameCollection) topics).topicNames(), options)));
|
||||
result = DeleteTopicsResult.ofTopicNames(new HashMap<>(handleDeleteTopicsUsingNames(((TopicNameCollection) topics).topicNames())));
|
||||
else
|
||||
throw new IllegalArgumentException("The TopicCollection provided did not match any supported classes for deleteTopics.");
|
||||
return result;
|
||||
}
|
||||
|
||||
private Map<String, KafkaFuture<Void>> handleDeleteTopicsUsingNames(Collection<String> topicNameCollection, DeleteTopicsOptions options) {
|
||||
private Map<String, KafkaFuture<Void>> handleDeleteTopicsUsingNames(Collection<String> topicNameCollection) {
|
||||
Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
|
||||
Collection<String> topicNames = new ArrayList<>(topicNameCollection);
|
||||
|
||||
|
|
@ -590,7 +591,7 @@ public class MockAdminClient extends AdminClient {
|
|||
return deleteTopicsResult;
|
||||
}
|
||||
|
||||
private Map<Uuid, KafkaFuture<Void>> handleDeleteTopicsUsingIds(Collection<Uuid> topicIdCollection, DeleteTopicsOptions options) {
|
||||
private Map<Uuid, KafkaFuture<Void>> handleDeleteTopicsUsingIds(Collection<Uuid> topicIdCollection) {
|
||||
Map<Uuid, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
|
||||
Collection<Uuid> topicIds = new ArrayList<>(topicIdCollection);
|
||||
|
||||
|
|
@ -1118,11 +1119,7 @@ public class MockAdminClient extends AdminClient {
|
|||
DescribeLogDirsResponse.INVALID_OFFSET_LAG));
|
||||
} else {
|
||||
ReplicaLogDirInfo info = replicaMoves.get(replica);
|
||||
if (info == null) {
|
||||
future.complete(new ReplicaLogDirInfo(currentLogDir, 0, null, 0));
|
||||
} else {
|
||||
future.complete(info);
|
||||
}
|
||||
future.complete(Objects.requireNonNullElseGet(info, () -> new ReplicaLogDirInfo(currentLogDir, 0, null, 0)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1172,8 +1169,7 @@ public class MockAdminClient extends AdminClient {
|
|||
Optional<Set<TopicPartition>> partitions,
|
||||
ListPartitionReassignmentsOptions options) {
|
||||
Map<TopicPartition, PartitionReassignment> map = new HashMap<>();
|
||||
for (TopicPartition partition : partitions.isPresent() ?
|
||||
partitions.get() : reassignments.keySet()) {
|
||||
for (TopicPartition partition : partitions.orElseGet(reassignments::keySet)) {
|
||||
PartitionReassignment reassignment = findPartitionReassignment(partition);
|
||||
if (reassignment != null) {
|
||||
map.put(partition, reassignment);
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ public class RemoveMembersFromConsumerGroupResultTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTopLevelErrorConstructor() throws InterruptedException {
|
||||
public void testTopLevelErrorConstructor() {
|
||||
memberFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
|
||||
RemoveMembersFromConsumerGroupResult topLevelErrorResult =
|
||||
new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ public class ListConsumerGroupOffsetsHandlerTest {
|
|||
private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = Collections.singletonMap(groupZero,
|
||||
new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, t1p1)));
|
||||
private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
|
||||
new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
|
||||
new HashMap<>() {{
|
||||
put(groupZero, new ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
|
||||
put(groupOne, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1)));
|
||||
put(groupTwo, new ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, t2p0, t2p1, t2p2)));
|
||||
|
|
|
|||
|
|
@ -1505,7 +1505,7 @@ public class AbstractCoordinatorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWakeupAfterSyncGroupReceived() throws Exception {
|
||||
public void testWakeupAfterSyncGroupReceived() {
|
||||
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS,
|
||||
Optional.empty(), Optional.of(() -> mock(BaseHeartbeatThread.class)));
|
||||
|
||||
|
|
@ -1537,7 +1537,7 @@ public class AbstractCoordinatorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
|
||||
public void testWakeupAfterSyncGroupReceivedExternalCompletion() {
|
||||
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS,
|
||||
Optional.empty(), Optional.of(() -> mock(BaseHeartbeatThread.class)));
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
||||
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
|
||||
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
|
||||
import org.apache.kafka.clients.consumer.SubscriptionPattern;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
|
|
@ -68,7 +67,6 @@ import org.apache.kafka.common.config.ConfigException;
|
|||
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||
import org.apache.kafka.common.errors.RetriableException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
|
|
@ -637,9 +635,7 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
|
||||
callback,
|
||||
null);
|
||||
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -649,9 +645,7 @@ public class AsyncKafkaConsumerTest {
|
|||
MockCommitCallback callback = new MockCommitCallback();
|
||||
completeCommitAsyncApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
assertMockCommitCallbackInvoked(() -> consumer.close(),
|
||||
callback,
|
||||
null);
|
||||
assertMockCommitCallbackInvoked(() -> consumer.close(), callback);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -782,15 +776,10 @@ public class AsyncKafkaConsumerTest {
|
|||
verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class));
|
||||
}
|
||||
|
||||
private void assertMockCommitCallbackInvoked(final Executable task,
|
||||
final MockCommitCallback callback,
|
||||
final Errors errors) {
|
||||
private void assertMockCommitCallbackInvoked(final Executable task, final MockCommitCallback callback) {
|
||||
assertDoesNotThrow(task);
|
||||
assertEquals(1, callback.invoked);
|
||||
if (errors == null)
|
||||
assertNull(callback.exception);
|
||||
else if (errors.exception() instanceof RetriableException)
|
||||
assertInstanceOf(RetriableCommitFailedException.class, callback.exception);
|
||||
assertNull(callback.exception);
|
||||
}
|
||||
|
||||
private static class MockCommitCallback implements OffsetCommitCallback {
|
||||
|
|
@ -1185,14 +1174,14 @@ public class AsyncKafkaConsumerTest {
|
|||
@Test
|
||||
public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
|
||||
consumer = newConsumer();
|
||||
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
|
||||
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
|
||||
// Create consumer without group id so committed offsets are not used for updating positions
|
||||
consumer = newConsumerWithoutGroupId();
|
||||
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
|
||||
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -1647,7 +1636,7 @@ public class AsyncKafkaConsumerTest {
|
|||
return props;
|
||||
}
|
||||
|
||||
private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) {
|
||||
private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
|
||||
completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException());
|
||||
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
|
|
|
|||
|
|
@ -429,16 +429,6 @@ public class CommitRequestManagerTest {
|
|||
assertExceptionHandling(commitRequestManager, error, true);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> commitSyncExpectedExceptions() {
|
||||
return Stream.of(
|
||||
Arguments.of(Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class),
|
||||
Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()),
|
||||
Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()),
|
||||
Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass()),
|
||||
Arguments.of(Errors.CORRUPT_MESSAGE, KafkaException.class),
|
||||
Arguments.of(Errors.UNKNOWN_SERVER_ERROR, KafkaException.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() {
|
||||
CommitRequestManager commitRequestManager = create(false, 100);
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
private final String consumerId2 = "consumer2";
|
||||
|
||||
private MockClient client;
|
||||
private final MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
|
||||
private final MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<>() {
|
||||
{
|
||||
put(topic1, 1);
|
||||
put(topic2, 1);
|
||||
|
|
@ -367,7 +367,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
|
||||
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
|
||||
for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
|
||||
ByteBuffer buf = null;
|
||||
ByteBuffer buf;
|
||||
if (subscriptionEntry.getKey().equals(consumerId)) {
|
||||
buf = ConsumerProtocol.serializeSubscription(subscriptionConsumer1);
|
||||
} else {
|
||||
|
|
@ -614,9 +614,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));
|
||||
|
||||
final AtomicBoolean committed = new AtomicBoolean();
|
||||
coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> {
|
||||
committed.set(true);
|
||||
});
|
||||
coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> committed.set(true));
|
||||
|
||||
assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail");
|
||||
assertFalse(committed.get());
|
||||
|
|
@ -683,7 +681,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
);
|
||||
|
||||
consumerClient.send(coordinator.checkAndGetCoordinator(), OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData))
|
||||
.compose(new RequestFutureAdapter<ClientResponse, Object>() {
|
||||
.compose(new RequestFutureAdapter<>() {
|
||||
@Override
|
||||
public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
|
||||
|
||||
|
|
@ -1294,7 +1292,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
|
||||
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
|
||||
subscriptions.subscribe(Pattern.compile("test.*"), Optional.of(rebalanceListener));
|
||||
client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
|
||||
client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap<>() {
|
||||
{
|
||||
put(topic1, 1);
|
||||
put(topic2, 1);
|
||||
|
|
@ -1306,11 +1304,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
|
||||
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
|
||||
|
||||
MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
|
||||
{
|
||||
put(topic1, 1);
|
||||
}
|
||||
});
|
||||
MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, Map.of(topic1, 1));
|
||||
// Instrument the test so that metadata will contain only one topic after next refresh.
|
||||
client.prepareMetadataUpdate(deletedMetadataResponse);
|
||||
|
||||
|
|
@ -2697,12 +2691,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
|
||||
final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
|
||||
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, firstOffset), new OffsetCommitCallback() {
|
||||
@Override
|
||||
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
|
||||
committedOffsets.add(firstOffset);
|
||||
}
|
||||
});
|
||||
coordinator.commitOffsetsAsync(singletonMap(t1p, firstOffset), (offsets, exception) -> committedOffsets.add(firstOffset));
|
||||
|
||||
// Do a synchronous commit in the background so that we can send both responses at the same time
|
||||
Thread thread = new Thread() {
|
||||
|
|
|
|||
|
|
@ -647,17 +647,6 @@ public class ConsumerHeartbeatRequestManagerTest {
|
|||
clearInvocations(backgroundEventHandler);
|
||||
}
|
||||
|
||||
private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
|
||||
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
|
||||
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
|
||||
when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
|
||||
ClientResponse response = createHeartbeatResponse(
|
||||
result.unsentRequests.get(0), error, exceptionCustomMsg);
|
||||
result.unsentRequests.get(0).handler().onComplete(response);
|
||||
}
|
||||
|
||||
private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
|
||||
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
|
||||
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData.Assignment;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
|
||||
|
|
@ -57,7 +56,6 @@ import java.util.stream.Collectors;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
|
||||
import static org.apache.kafka.common.requests.ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
|
|
@ -1228,23 +1226,6 @@ public class ShareMembershipManagerTest {
|
|||
return membershipManager;
|
||||
}
|
||||
|
||||
private void mockPartitionOwnedAndNewPartitionAdded(String topicName,
|
||||
int partitionOwned,
|
||||
int partitionAdded,
|
||||
CounterConsumerRebalanceListener listener,
|
||||
ShareMembershipManager membershipManager) {
|
||||
Uuid topicId = Uuid.randomUuid();
|
||||
TopicPartition owned = new TopicPartition(topicName, partitionOwned);
|
||||
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(owned));
|
||||
membershipManager.updateAssignment(Collections.singletonMap(topicId, mkSortedSet(partitionOwned)));
|
||||
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
|
||||
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
|
||||
when(subscriptionState.rebalanceListener()).thenReturn(Optional.ofNullable(listener));
|
||||
|
||||
// Receive assignment adding a new partition
|
||||
receiveAssignment(topicId, Arrays.asList(partitionOwned, partitionAdded), membershipManager);
|
||||
}
|
||||
|
||||
private SortedSet<TopicIdPartition> topicIdPartitionsSet(Uuid topicId, String topicName, int... partitions) {
|
||||
SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new Utils.TopicIdPartitionComparator());
|
||||
|
||||
|
|
@ -1573,17 +1554,6 @@ public class ShareMembershipManagerTest {
|
|||
doNothing().when(subscriptionState).markPendingRevocation(anySet());
|
||||
}
|
||||
|
||||
private void mockPrepareLeaving(ShareMembershipManager membershipManager) {
|
||||
String topicName = "topic1";
|
||||
TopicPartition ownedPartition = new TopicPartition(topicName, 0);
|
||||
|
||||
// Start leaving group, blocked waiting for callback to complete.
|
||||
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
|
||||
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
|
||||
doNothing().when(subscriptionState).markPendingRevocation(anySet());
|
||||
membershipManager.leaveGroup();
|
||||
}
|
||||
|
||||
private void testStateUpdateOnFatalFailure(ShareMembershipManager membershipManager) {
|
||||
String memberId = membershipManager.memberId();
|
||||
int lastEpoch = membershipManager.memberEpoch();
|
||||
|
|
@ -1641,10 +1611,6 @@ public class ShareMembershipManagerTest {
|
|||
));
|
||||
}
|
||||
|
||||
private KafkaMetric getMetric(final String name) {
|
||||
return metrics.metrics().get(metrics.metricName(name, CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-coordinator-metrics"));
|
||||
}
|
||||
|
||||
private ShareMembershipManager memberJoinWithAssignment() {
|
||||
Uuid topicId = Uuid.randomUuid();
|
||||
ShareMembershipManager membershipManager = mockJoinAndReceiveAssignment(true);
|
||||
|
|
|
|||
|
|
@ -158,9 +158,8 @@ public class ShareSessionHandlerTest {
|
|||
Uuid memberId = Uuid.randomUuid();
|
||||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid fooId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid fooId = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition foo0 = new TopicIdPartition(fooId, 0, "foo");
|
||||
TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
|
||||
handler.addPartitionToFetch(foo0, null);
|
||||
|
|
@ -180,7 +179,7 @@ public class ShareSessionHandlerTest {
|
|||
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
|
||||
|
||||
// Test a fetch request which adds one partition
|
||||
Uuid barId = addTopicId(topicIds, topicNames, "bar");
|
||||
Uuid barId = addTopicId(topicNames, "bar");
|
||||
TopicIdPartition bar0 = new TopicIdPartition(barId, 0, "bar");
|
||||
handler.addPartitionToFetch(foo0, null);
|
||||
handler.addPartitionToFetch(foo1, null);
|
||||
|
|
@ -225,10 +224,9 @@ public class ShareSessionHandlerTest {
|
|||
Uuid memberId = Uuid.randomUuid();
|
||||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid fooId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid barId = addTopicId(topicIds, topicNames, "bar");
|
||||
Uuid fooId = addTopicId(topicNames, "foo");
|
||||
Uuid barId = addTopicId(topicNames, "bar");
|
||||
TopicIdPartition foo0 = new TopicIdPartition(fooId, 0, "foo");
|
||||
TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
|
||||
TopicIdPartition bar0 = new TopicIdPartition(barId, 0, "bar");
|
||||
|
|
@ -292,9 +290,8 @@ public class ShareSessionHandlerTest {
|
|||
Uuid memberId = Uuid.randomUuid();
|
||||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid topicId1 = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId1 = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition tp = new TopicIdPartition(topicId1, 0, "foo");
|
||||
handler.addPartitionToFetch(tp, null);
|
||||
ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
|
||||
|
|
@ -312,7 +309,7 @@ public class ShareSessionHandlerTest {
|
|||
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
|
||||
|
||||
// Try to add a new topic ID
|
||||
Uuid topicId2 = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId2 = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition tp2 = new TopicIdPartition(topicId2, 0, "foo");
|
||||
// Use the same data besides the topic ID
|
||||
handler.addPartitionToFetch(tp2, null);
|
||||
|
|
@ -337,9 +334,8 @@ public class ShareSessionHandlerTest {
|
|||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
// We want to test when all topics are removed from the session
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid topicId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
|
||||
handler.addPartitionToFetch(foo0, null);
|
||||
ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
|
||||
|
|
@ -372,9 +368,8 @@ public class ShareSessionHandlerTest {
|
|||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
// We want to test when all topics are removed from the session
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid topicId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
|
||||
handler.addPartitionToFetch(foo0, null);
|
||||
ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
|
||||
|
|
@ -405,9 +400,8 @@ public class ShareSessionHandlerTest {
|
|||
Uuid memberId = Uuid.randomUuid();
|
||||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid topicId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId = addTopicId(topicNames, "foo");
|
||||
handler.addPartitionToFetch(new TopicIdPartition(topicId, 0, "foo"), null);
|
||||
ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
|
||||
assertMapsEqual(reqMap(new TopicIdPartition(topicId, 0, "foo")),
|
||||
|
|
@ -431,7 +425,7 @@ public class ShareSessionHandlerTest {
|
|||
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
|
||||
|
||||
// After the topic is removed, add a recreated topic with a new ID
|
||||
Uuid topicId2 = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid topicId2 = addTopicId(topicNames, "foo");
|
||||
handler.addPartitionToFetch(new TopicIdPartition(topicId2, 0, "foo"), null);
|
||||
ShareFetchRequestData requestData3 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
|
||||
|
||||
|
|
@ -446,9 +440,8 @@ public class ShareSessionHandlerTest {
|
|||
Uuid memberId = Uuid.randomUuid();
|
||||
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
|
||||
|
||||
Map<String, Uuid> topicIds = new HashMap<>();
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
Uuid fooId = addTopicId(topicIds, topicNames, "foo");
|
||||
Uuid fooId = addTopicId(topicNames, "foo");
|
||||
TopicIdPartition foo0 = new TopicIdPartition(fooId, 0, "foo");
|
||||
|
||||
Acknowledgements acknowledgements = Acknowledgements.empty();
|
||||
|
|
@ -473,9 +466,7 @@ public class ShareSessionHandlerTest {
|
|||
assertEquals(memberId.toString(), requestData.memberId());
|
||||
}
|
||||
|
||||
private Uuid addTopicId(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames, String name) {
|
||||
// If the same topic name is added more than once, the latest mapping will be in the
|
||||
// topicIds, but all mappings will be in topicNames. This is needed in the replace tests.
|
||||
private Uuid addTopicId(Map<Uuid, String> topicNames, String name) {
|
||||
Uuid id = Uuid.randomUuid();
|
||||
topicNames.put(id, name);
|
||||
return id;
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ public class RecordAccumulatorTest {
|
|||
}
|
||||
|
||||
private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> nodeBatches, TopicPartition... tp) {
|
||||
int allTpBatchCount = (int) nodeBatches.values().stream().flatMap(Collection::stream).count();
|
||||
int allTpBatchCount = (int) nodeBatches.values().stream().mapToLong(Collection::size).sum();
|
||||
assertEquals(tp.length, allTpBatchCount);
|
||||
List<TopicPartition> topicPartitionsInBatch = new ArrayList<>();
|
||||
for (Map.Entry<Integer, List<ProducerBatch>> entry : nodeBatches.entrySet()) {
|
||||
|
|
@ -1610,22 +1610,6 @@ public class RecordAccumulatorTest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the offset delta when there is no key.
|
||||
*/
|
||||
private int expectedNumAppendsNoKey(int batchSize) {
|
||||
int size = 0;
|
||||
int offsetDelta = 0;
|
||||
while (true) {
|
||||
int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0, 0, value.length,
|
||||
Record.EMPTY_HEADERS);
|
||||
if (size + recordSize > batchSize)
|
||||
return offsetDelta;
|
||||
offsetDelta += 1;
|
||||
size += recordSize;
|
||||
}
|
||||
}
|
||||
|
||||
private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, Compression compression, int lingerMs) {
|
||||
int deliveryTimeoutMs = 3200;
|
||||
return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, compression, lingerMs);
|
||||
|
|
|
|||
|
|
@ -3153,7 +3153,7 @@ public class TransactionManagerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testMaybeResolveSequencesTransactionalProducer(boolean transactionV2Enabled) throws Exception {
|
||||
public void testMaybeResolveSequencesTransactionalProducer(boolean transactionV2Enabled) {
|
||||
initializeTransactionManager(Optional.of(transactionalId), transactionV2Enabled);
|
||||
|
||||
// Initialize transaction with initial producer ID and epoch.
|
||||
|
|
@ -4056,7 +4056,7 @@ public class TransactionManagerTest {
|
|||
long producerId,
|
||||
short producerEpoch
|
||||
) {
|
||||
prepareInitPidResponse(error, shouldDisconnect, producerId, producerEpoch, false, false, (long) -1, (short) -1);
|
||||
prepareInitPidResponse(error, shouldDisconnect, producerId, producerEpoch, false, false, -1, (short) -1);
|
||||
}
|
||||
|
||||
private void prepareInitPidResponse(
|
||||
|
|
|
|||
|
|
@ -183,11 +183,6 @@ public final class NioEchoServer extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
public void verifyAuthenticationNoReauthMetric(int successfulAuthenticationNoReauths) throws InterruptedException {
|
||||
waitForMetrics("successful-authentication-no-reauth", successfulAuthenticationNoReauths,
|
||||
EnumSet.of(MetricType.TOTAL));
|
||||
}
|
||||
|
||||
public void waitForMetric(String name, final double expectedValue) throws InterruptedException {
|
||||
waitForMetrics(name, expectedValue, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1096,14 +1096,14 @@ public class SslTransportLayerTest {
|
|||
|
||||
CertStores invalidCertStores = certBuilder(true, "server", args.useInlinePem).addHostName("127.0.0.1").build();
|
||||
Map<String, Object> invalidConfigs = args.getTrustingConfig(invalidCertStores, args.clientCertStores);
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "keystore with different SubjectAltName");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs);
|
||||
|
||||
Map<String, Object> missingStoreConfigs = new HashMap<>();
|
||||
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
|
||||
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "some.keystore.path");
|
||||
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new Password("some.keystore.password"));
|
||||
missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("some.key.password"));
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs);
|
||||
|
||||
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
|
||||
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
|
@ -1167,7 +1167,7 @@ public class SslTransportLayerTest {
|
|||
for (String propName : CertStores.KEYSTORE_PROPS) {
|
||||
invalidKeystoreConfigs.put(propName, invalidConfig.get(propName));
|
||||
}
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidKeystoreConfigs, "keystore without existing SubjectAltName");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidKeystoreConfigs);
|
||||
String node3 = "3";
|
||||
selector.connect(node3, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
|
||||
|
|
@ -1223,13 +1223,13 @@ public class SslTransportLayerTest {
|
|||
|
||||
Map<String, Object> invalidConfigs = new HashMap<>(newTruststoreConfigs);
|
||||
invalidConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "INVALID_TYPE");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "invalid truststore type");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs);
|
||||
|
||||
Map<String, Object> missingStoreConfigs = new HashMap<>();
|
||||
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
|
||||
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "some.truststore.path");
|
||||
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("some.truststore.password"));
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found");
|
||||
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs);
|
||||
|
||||
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
|
||||
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
|
@ -1280,7 +1280,7 @@ public class SslTransportLayerTest {
|
|||
}
|
||||
|
||||
private void verifyInvalidReconfigure(ListenerReconfigurable reconfigurable,
|
||||
Map<String, Object> invalidConfigs, String errorMessage) {
|
||||
Map<String, Object> invalidConfigs) {
|
||||
assertThrows(KafkaException.class, () -> reconfigurable.validateReconfiguration(invalidConfigs));
|
||||
assertThrows(KafkaException.class, () -> reconfigurable.reconfigure(invalidConfigs));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
|
|||
import org.apache.kafka.common.utils.BufferSupplier;
|
||||
import org.apache.kafka.common.utils.ByteBufferOutputStream;
|
||||
import org.apache.kafka.common.utils.CloseableIterator;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
|
|
@ -50,7 +49,6 @@ import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
|
|||
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
|
@ -91,8 +89,6 @@ public class MemoryRecordsBuilderTest {
|
|||
}
|
||||
}
|
||||
|
||||
private final Time time = Time.SYSTEM;
|
||||
|
||||
@Test
|
||||
public void testUnsupportedCompress() {
|
||||
BiFunction<Byte, Compression, MemoryRecordsBuilder> builderBiFunction = (magic, compression) ->
|
||||
|
|
@ -638,31 +634,6 @@ public class MemoryRecordsBuilderTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void verifyRecordsProcessingStats(Compression compression, RecordValidationStats processingStats,
|
||||
int numRecords, int numRecordsConverted, long finalBytes,
|
||||
long preConvertedBytes) {
|
||||
assertNotNull(processingStats, "Records processing info is null");
|
||||
assertEquals(numRecordsConverted, processingStats.numRecordsConverted());
|
||||
// Since nanoTime accuracy on build machines may not be sufficient to measure small conversion times,
|
||||
// only check if the value >= 0. Default is -1, so this checks if time has been recorded.
|
||||
assertTrue(processingStats.conversionTimeNanos() >= 0, "Processing time not recorded: " + processingStats);
|
||||
long tempBytes = processingStats.temporaryMemoryBytes();
|
||||
if (compression.type() == CompressionType.NONE) {
|
||||
if (numRecordsConverted == 0)
|
||||
assertEquals(finalBytes, tempBytes);
|
||||
else if (numRecordsConverted == numRecords)
|
||||
assertEquals(preConvertedBytes + finalBytes, tempBytes);
|
||||
else {
|
||||
assertTrue(tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes,
|
||||
String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes));
|
||||
}
|
||||
} else {
|
||||
long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
|
||||
assertTrue(tempBytes > compressedBytes,
|
||||
String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes));
|
||||
}
|
||||
}
|
||||
|
||||
private ByteBuffer allocateBuffer(int size, Args args) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(size);
|
||||
buffer.position(args.bufferOffset);
|
||||
|
|
|
|||
|
|
@ -44,9 +44,6 @@ public class CreateAclsRequestTest {
|
|||
private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
|
||||
|
||||
private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
|
||||
|
||||
private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
|
||||
new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
|
||||
|
||||
|
|
|
|||
|
|
@ -33,9 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
public class DescribeAclsRequestTest {
|
||||
private static final short V1 = 1;
|
||||
|
||||
private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", PatternType.LITERAL),
|
||||
new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
|
||||
|
||||
private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
|
||||
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
|
||||
|
||||
|
|
|
|||
|
|
@ -74,12 +74,6 @@ public class DescribeAclsResponseTest {
|
|||
PatternType.LITERAL,
|
||||
Collections.singletonList(ALLOW_CREATE_ACL));
|
||||
|
||||
private static final DescribeAclsResource LITERAL_ACL2 = buildResource(
|
||||
"group",
|
||||
ResourceType.GROUP,
|
||||
PatternType.LITERAL,
|
||||
Collections.singletonList(DENY_READ_ACL));
|
||||
|
||||
@Test
|
||||
public void shouldThrowIfUnknown() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
|
|
|
|||
|
|
@ -1124,7 +1124,7 @@ public class RequestResponseTest {
|
|||
case ALTER_CLIENT_QUOTAS: return createAlterClientQuotasResponse();
|
||||
case DESCRIBE_USER_SCRAM_CREDENTIALS: return createDescribeUserScramCredentialsResponse();
|
||||
case ALTER_USER_SCRAM_CREDENTIALS: return createAlterUserScramCredentialsResponse();
|
||||
case VOTE: return createVoteResponse(version);
|
||||
case VOTE: return createVoteResponse();
|
||||
case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
|
||||
case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
|
||||
case DESCRIBE_QUORUM: return createDescribeQuorumResponse();
|
||||
|
|
@ -1187,7 +1187,7 @@ public class RequestResponseTest {
|
|||
.setGroupEpoch(0)
|
||||
.setAssignmentEpoch(0)
|
||||
.setAssignorName("range")
|
||||
.setMembers(new ArrayList<ConsumerGroupDescribeResponseData.Member>(0))
|
||||
.setMembers(new ArrayList<>(0))
|
||||
))
|
||||
.setThrottleTimeMs(1000);
|
||||
return new ConsumerGroupDescribeResponse(data);
|
||||
|
|
@ -1660,7 +1660,7 @@ public class RequestResponseTest {
|
|||
return new VoteRequest.Builder(data).build(version);
|
||||
}
|
||||
|
||||
private VoteResponse createVoteResponse(short version) {
|
||||
private VoteResponse createVoteResponse() {
|
||||
VoteResponseData.PartitionData partitionData = new VoteResponseData.PartitionData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setLeaderEpoch(0)
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ public class SaslServerAuthenticatorTest {
|
|||
return headerBuffer.remaining();
|
||||
});
|
||||
|
||||
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
|
||||
assertThrows(InvalidRequestException.class, authenticator::authenticate);
|
||||
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
|
||||
}
|
||||
|
||||
|
|
@ -156,7 +156,7 @@ public class SaslServerAuthenticatorTest {
|
|||
return headerBuffer.remaining();
|
||||
});
|
||||
|
||||
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
|
||||
assertThrows(InvalidRequestException.class, authenticator::authenticate);
|
||||
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -359,7 +359,6 @@ public abstract class SslFactoryTest {
|
|||
sslConfig = new TestSecurityConfig(props);
|
||||
sslFactory.reconfigure(sslConfig.values());
|
||||
assertNotSame(sslEngineFactory, sslFactory.sslEngineFactory(), "SslEngineFactory not recreated");
|
||||
sslEngineFactory = sslFactory.sslEngineFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -400,15 +399,15 @@ public abstract class SslFactoryTest {
|
|||
|
||||
@Test
|
||||
public void testKeystoreVerifiableUsingTruststore() throws Exception {
|
||||
verifyKeystoreVerifiableUsingTruststore(false, tlsProtocol);
|
||||
verifyKeystoreVerifiableUsingTruststore(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPemKeystoreVerifiableUsingTruststore() throws Exception {
|
||||
verifyKeystoreVerifiableUsingTruststore(true, tlsProtocol);
|
||||
verifyKeystoreVerifiableUsingTruststore(true);
|
||||
}
|
||||
|
||||
private void verifyKeystoreVerifiableUsingTruststore(boolean usePem, String tlsProtocol) throws Exception {
|
||||
private void verifyKeystoreVerifiableUsingTruststore(boolean usePem) throws Exception {
|
||||
File trustStoreFile1 = usePem ? null : TestUtils.tempFile("truststore1", ".jks");
|
||||
Map<String, Object> sslConfig1 = sslConfigsBuilder(ConnectionMode.SERVER)
|
||||
.createNewTrustStore(trustStoreFile1)
|
||||
|
|
@ -436,15 +435,15 @@ public abstract class SslFactoryTest {
|
|||
|
||||
@Test
|
||||
public void testCertificateEntriesValidation() throws Exception {
|
||||
verifyCertificateEntriesValidation(false, tlsProtocol);
|
||||
verifyCertificateEntriesValidation(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPemCertificateEntriesValidation() throws Exception {
|
||||
verifyCertificateEntriesValidation(true, tlsProtocol);
|
||||
verifyCertificateEntriesValidation(true);
|
||||
}
|
||||
|
||||
private void verifyCertificateEntriesValidation(boolean usePem, String tlsProtocol) throws Exception {
|
||||
private void verifyCertificateEntriesValidation(boolean usePem) throws Exception {
|
||||
File trustStoreFile = usePem ? null : TestUtils.tempFile("truststore", ".jks");
|
||||
Map<String, Object> serverSslConfig = sslConfigsBuilder(ConnectionMode.SERVER)
|
||||
.createNewTrustStore(trustStoreFile)
|
||||
|
|
|
|||
|
|
@ -372,7 +372,7 @@ public class TestSslUtils {
|
|||
try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))) {
|
||||
pemWriter.writeObject(new JcaMiscPEMGenerator(cert));
|
||||
}
|
||||
return new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||
return out.toString(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
static String pem(PrivateKey privateKey, Password password) throws IOException {
|
||||
|
|
@ -390,7 +390,7 @@ public class TestSslUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
return new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||
return out.toString(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public static class CertificateBuilder {
|
||||
|
|
|
|||
|
|
@ -505,7 +505,7 @@ public class TestUtils {
|
|||
assertNotNull(clusterId);
|
||||
|
||||
// Base 64 encoded value is 22 characters
|
||||
assertEquals(clusterId.length(), 22);
|
||||
assertEquals(22, clusterId.length());
|
||||
|
||||
Pattern clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+");
|
||||
Matcher matcher = clusterIdPattern.matcher(clusterId);
|
||||
|
|
@ -516,7 +516,7 @@ public class TestUtils {
|
|||
byte[] decodedUuid = Base64.getDecoder().decode(originalClusterId);
|
||||
|
||||
// We expect 16 bytes, same as the input UUID.
|
||||
assertEquals(decodedUuid.length, 16);
|
||||
assertEquals(16, decodedUuid.length);
|
||||
|
||||
//Check if it can be converted back to a UUID.
|
||||
try {
|
||||
|
|
@ -570,17 +570,6 @@ public class TestUtils {
|
|||
return toBuffer(records.toSend());
|
||||
}
|
||||
|
||||
public static Set<TopicPartition> generateRandomTopicPartitions(int numTopic, int numPartitionPerTopic) {
|
||||
Set<TopicPartition> tps = new HashSet<>();
|
||||
for (int i = 0; i < numTopic; i++) {
|
||||
String topic = randomString(32);
|
||||
for (int j = 0; j < numPartitionPerTopic; j++) {
|
||||
tps.add(new TopicPartition(topic, j));
|
||||
}
|
||||
}
|
||||
return tps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a future raises an expected exception cause type.
|
||||
* This method will wait for the future to complete or timeout(15000 milliseconds).
|
||||
|
|
|
|||
Loading…
Reference in New Issue