KAFKA-16007 Merge batch records during ZK migration (#15007)

To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
David Arthur 2023-12-15 19:33:15 -05:00 committed by GitHub
parent d9e8d0ddc1
commit 7f763d327f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 363 additions and 44 deletions

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so
* by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
* by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
* </p>
* Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any
* buffered records.
*/
public class BufferingBatchConsumer<T> implements Consumer<List<T>> {
private final Consumer<List<T>> delegateConsumer;
private final int minBatchSize;
private List<T> bufferedBatch;
BufferingBatchConsumer(Consumer<List<T>> delegateConsumer, int minBatchSize) {
this.delegateConsumer = delegateConsumer;
this.minBatchSize = minBatchSize;
this.bufferedBatch = new ArrayList<>(minBatchSize);
}
@Override
public void accept(List<T> batch) {
bufferedBatch.addAll(batch);
if (bufferedBatch.size() >= minBatchSize) {
flush();
}
}
public void flush() {
if (!bufferedBatch.isEmpty()) {
delegateConsumer.accept(bufferedBatch);
bufferedBatch = new ArrayList<>(minBatchSize);
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline; import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.FutureUtils;
@ -87,7 +88,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
* amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from * amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
* blocking indefinitely. * blocking indefinitely.
*/ */
private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000; final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
private final Time time; private final Time time;
private final Logger log; private final Logger log;
@ -645,6 +648,29 @@ public class KRaftMigrationDriver implements MetadataPublisher {
} }
} }
private BufferingBatchConsumer<ApiMessageAndVersion> buildMigrationBatchConsumer(
MigrationManifest.Builder manifestBuilder
) {
return new BufferingBatchConsumer<>(batch -> {
try {
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit " + batch.size() + " migration records",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
long batchEnd = time.nanoseconds();
manifestBuilder.acceptBatch(batch, batchEnd - batchStart);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, MIGRATION_MIN_BATCH_SIZE);
}
class MigrateMetadataEvent extends MigrationEvent { class MigrateMetadataEvent extends MigrationEvent {
@Override @Override
public void run() throws Exception { public void run() throws Exception {
@ -664,23 +690,12 @@ public class KRaftMigrationDriver implements MetadataPublisher {
super.handleException(t); super.handleException(t);
} }
try { try {
zkMigrationClient.readAllMetadata(batch -> { BufferingBatchConsumer<ApiMessageAndVersion> migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
try { zkMigrationClient.readAllMetadata(
log.info("Migrating {} records from ZK", batch.size()); migrationBatchConsumer,
if (log.isTraceEnabled()) { brokersInMetadata::add
batch.forEach(apiMessageAndVersion -> );
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message()))); migrationBatchConsumer.flush();
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit migration record batch",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
manifestBuilder.acceptBatch(batch);
} catch (Throwable e) {
// This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, brokersInMetadata::add);
CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration(); CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging( OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "", KRaftMigrationDriver.this.log, "",

View File

@ -41,6 +41,7 @@ public class MigrationManifest {
private final Map<MetadataRecordType, Integer> counts = new HashMap<>(); private final Map<MetadataRecordType, Integer> counts = new HashMap<>();
private int batches = 0; private int batches = 0;
private int total = 0; private int total = 0;
private long batchDurationsNs = 0;
private long endTimeNanos = 0; private long endTimeNanos = 0;
Builder(Time time) { Builder(Time time) {
@ -48,8 +49,9 @@ public class MigrationManifest {
this.startTimeNanos = time.nanoseconds(); this.startTimeNanos = time.nanoseconds();
} }
public void acceptBatch(List<ApiMessageAndVersion> recordBatch) { public void acceptBatch(List<ApiMessageAndVersion> recordBatch, long durationNs) {
batches++; batches++;
batchDurationsNs += durationNs;
recordBatch.forEach(apiMessageAndVersion -> { recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey()); MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum); counts.merge(type, 1, Integer::sum);
@ -62,23 +64,26 @@ public class MigrationManifest {
endTimeNanos = time.nanoseconds(); endTimeNanos = time.nanoseconds();
} }
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts); Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts); return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts);
} }
} }
private final int totalRecords; private final int totalRecords;
private final int totalBatches; private final int totalBatches;
private final long totalBatchDurationsNs;
private final long durationNanos; private final long durationNanos;
private final Map<MetadataRecordType, Integer> recordTypeCounts; private final Map<MetadataRecordType, Integer> recordTypeCounts;
MigrationManifest( MigrationManifest(
int totalRecords, int totalRecords,
int totalBatches, int totalBatches,
long totalBatchDurationsNs,
long durationNanos, long durationNanos,
Map<MetadataRecordType, Integer> recordTypeCounts Map<MetadataRecordType, Integer> recordTypeCounts
) { ) {
this.totalRecords = totalRecords; this.totalRecords = totalRecords;
this.totalBatches = totalBatches; this.totalBatches = totalBatches;
this.totalBatchDurationsNs = totalBatchDurationsNs;
this.durationNanos = durationNanos; this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts); this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
} }
@ -91,6 +96,13 @@ public class MigrationManifest {
return TimeUnit.NANOSECONDS.toMillis(durationNanos); return TimeUnit.NANOSECONDS.toMillis(durationNanos);
} }
public double avgBatchDurationMs() {
if (totalBatches == 0) {
return -1;
}
return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
@ -98,17 +110,20 @@ public class MigrationManifest {
MigrationManifest that = (MigrationManifest) o; MigrationManifest that = (MigrationManifest) o;
return totalRecords == that.totalRecords && return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches && totalBatches == that.totalBatches &&
totalBatchDurationsNs == that.totalBatchDurationsNs &&
durationNanos == that.durationNanos && durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts); recordTypeCounts.equals(that.recordTypeCounts);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts); return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts);
} }
public String toString() { public String toString() {
return String.format("%d records were generated in %d ms across %d batches. The record types were %s", return String.format(
totalRecords, durationMs(), totalBatches, recordTypeCounts); "%d records were generated in %d ms across %d batches. The average time spent waiting on a " +
"batch was %.2f ms. The record types were %s",
totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts);
} }
} }

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class BufferingBatchConsumerTest {
@Test
public void testEmptyBatches() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Collections.emptyList());
consumer.accept(Collections.emptyList());
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 0);
}
@Test
public void testOneBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
}
@Test
public void testOneBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
}
@Test
public void testOneBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}
@Test
public void testMultiBatchSameAsMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
}
@Test
public void testMultiBatchSmallerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Collections.singletonList(5));
assertEquals(batches.size(), 0);
consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}
@Test
public void testMultiBatchLargerThanMinSize() {
List<List<Integer>> batches = new ArrayList<>();
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6);
consumer.accept(Arrays.asList(1, 2));
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Arrays.asList(5, 6));
consumer.accept(Arrays.asList(7, 8));
consumer.accept(Arrays.asList(9, 10));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
consumer.flush();
assertEquals(batches.size(), 2);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
}
}

View File

@ -30,6 +30,20 @@ import java.util.stream.IntStream;
class CapturingMigrationClient implements MigrationClient { class CapturingMigrationClient implements MigrationClient {
static final MigrationBatchSupplier EMPTY_BATCH_SUPPLIER = new MigrationBatchSupplier() {
};
interface MigrationBatchSupplier {
default List<List<ApiMessageAndVersion>> recordBatches() {
return Collections.emptyList();
}
default List<Integer> brokerIds() {
return Collections.emptyList();
}
}
static Builder newBuilder() { static Builder newBuilder() {
return new Builder(); return new Builder();
} }
@ -40,6 +54,8 @@ class CapturingMigrationClient implements MigrationClient {
ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient(); ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient();
AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient(); AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient();
DelegationTokenMigrationClient delegationTokenMigrationClient = new CapturingDelegationTokenMigrationClient(); DelegationTokenMigrationClient delegationTokenMigrationClient = new CapturingDelegationTokenMigrationClient();
MigrationBatchSupplier batchSupplier = EMPTY_BATCH_SUPPLIER;
public Builder setBrokersInZk(int... brokerIds) { public Builder setBrokersInZk(int... brokerIds) {
brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet()); brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet());
@ -66,13 +82,19 @@ class CapturingMigrationClient implements MigrationClient {
return this; return this;
} }
public Builder setBatchSupplier(MigrationBatchSupplier batchSupplier) {
this.batchSupplier = batchSupplier;
return this;
}
public CapturingMigrationClient build() { public CapturingMigrationClient build() {
return new CapturingMigrationClient( return new CapturingMigrationClient(
brokersInZk, brokersInZk,
topicMigrationClient, topicMigrationClient,
configMigrationClient, configMigrationClient,
aclMigrationClient, aclMigrationClient,
delegationTokenMigrationClient delegationTokenMigrationClient,
batchSupplier
); );
} }
} }
@ -82,7 +104,7 @@ class CapturingMigrationClient implements MigrationClient {
private final ConfigMigrationClient configMigrationClient; private final ConfigMigrationClient configMigrationClient;
private final AclMigrationClient aclMigrationClient; private final AclMigrationClient aclMigrationClient;
private final DelegationTokenMigrationClient delegationTokenMigrationClient; private final DelegationTokenMigrationClient delegationTokenMigrationClient;
private final MigrationBatchSupplier batchSupplier;
private ZkMigrationLeadershipState state = null; private ZkMigrationLeadershipState state = null;
CapturingMigrationClient( CapturingMigrationClient(
@ -90,13 +112,15 @@ class CapturingMigrationClient implements MigrationClient {
TopicMigrationClient topicMigrationClient, TopicMigrationClient topicMigrationClient,
ConfigMigrationClient configMigrationClient, ConfigMigrationClient configMigrationClient,
AclMigrationClient aclMigrationClient, AclMigrationClient aclMigrationClient,
DelegationTokenMigrationClient delegationTokenMigrationClient DelegationTokenMigrationClient delegationTokenMigrationClient,
MigrationBatchSupplier batchSupplier
) { ) {
this.brokerIds = brokerIdsInZk; this.brokerIds = brokerIdsInZk;
this.topicMigrationClient = topicMigrationClient; this.topicMigrationClient = topicMigrationClient;
this.configMigrationClient = configMigrationClient; this.configMigrationClient = configMigrationClient;
this.aclMigrationClient = aclMigrationClient; this.aclMigrationClient = aclMigrationClient;
this.delegationTokenMigrationClient = delegationTokenMigrationClient; this.delegationTokenMigrationClient = delegationTokenMigrationClient;
this.batchSupplier = batchSupplier;
} }
@Override @Override
@ -165,7 +189,8 @@ class CapturingMigrationClient implements MigrationClient {
Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<List<ApiMessageAndVersion>> batchConsumer,
Consumer<Integer> brokerIdConsumer Consumer<Integer> brokerIdConsumer
) { ) {
batchSupplier.recordBatches().forEach(batchConsumer);
batchSupplier.brokerIds().forEach(brokerIdConsumer);
} }
@Override @Override

View File

@ -48,7 +48,7 @@ public class CapturingTopicMigrationClient implements TopicMigrationClient {
@Override @Override
public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) { public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
} }
@Override @Override

View File

@ -18,11 +18,13 @@ package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures; import org.apache.kafka.controller.QuorumFeatures;
@ -53,8 +55,11 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -72,6 +77,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS;
@ -305,7 +311,8 @@ public class KRaftMigrationDriverTest {
new CapturingTopicMigrationClient(), new CapturingTopicMigrationClient(),
new CapturingConfigMigrationClient(), new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(), new CapturingAclMigrationClient(),
new CapturingDelegationTokenMigrationClient()) { new CapturingDelegationTokenMigrationClient(),
CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
@Override @Override
public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) { public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
if (claimLeaderAttempts.getCount() == 0) { if (claimLeaderAttempts.getCount() == 0) {
@ -447,11 +454,8 @@ public class KRaftMigrationDriverTest {
@Test @Test
public void testSkipWaitForBrokersInDualWrite() throws Exception { public void testSkipWaitForBrokersInDualWrite() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), CapturingMigrationClient.newBuilder().build();
new CapturingTopicMigrationClient(), CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(),
new CapturingDelegationTokenMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
KRaftMigrationDriver.Builder builder = defaultTestBuilder() KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient) .setZkMigrationClient(migrationClient)
@ -761,7 +765,7 @@ public class KRaftMigrationDriverTest {
}; };
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build(); CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents"); MockFaultHandler faultHandler = new MockFaultHandler("testBeginMigrationOnce");
KRaftMigrationDriver.Builder builder = defaultTestBuilder() KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient) .setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer) .setZkRecordConsumer(recordConsumer)
@ -795,4 +799,83 @@ public class KRaftMigrationDriverTest {
assertEquals(1, migrationBeginCalls.get()); assertEquals(1, migrationBeginCalls.get());
} }
} }
private List<ApiMessageAndVersion> fillBatch(int size) {
ApiMessageAndVersion[] batch = new ApiMessageAndVersion[size];
Arrays.fill(batch, new ApiMessageAndVersion(new TopicRecord().setName("topic-fill").setTopicId(Uuid.randomUuid()), (short) 0));
return Arrays.asList(batch);
}
static Stream<Arguments> batchSizes() {
return Stream.of(
Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1),
Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000),
Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001),
Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001),
Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000)
);
}
@ParameterizedTest
@MethodSource("batchSizes")
public void testCoalesceMigrationRecords(List<Integer> batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception {
List<List<ApiMessageAndVersion>> batchesPassedToController = new ArrayList<>();
NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
@Override
public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
batchesPassedToController.add(recordBatch);
return CompletableFuture.completedFuture(null);
}
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
.setBrokersInZk(1, 2, 3)
.setBatchSupplier(new CapturingMigrationClient.MigrationBatchSupplier() {
@Override
public List<List<ApiMessageAndVersion>> recordBatches() {
List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
for (int batchSize : batchSizes) {
batches.add(fillBatch(batchSize));
}
return batches;
}
})
.build();
MockFaultHandler faultHandler = new MockFaultHandler("testRebatchMigrationRecords");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
assertEquals(expectedBatchCount, batchesPassedToController.size());
assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum());
}
}
} }

View File

@ -38,7 +38,7 @@ public class MigrationManifestTest {
MigrationManifest manifest = manifestBuilder.build(); MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs()); assertEquals(0L, manifest.durationMs());
assertEquals( assertEquals(
"0 records were generated in 0 ms across 0 batches. The record types were {}", "0 records were generated in 0 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}",
manifest.toString()); manifest.toString());
} }
@ -60,11 +60,12 @@ public class MigrationManifestTest {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0), new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0) new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
)); ), 20);
time.sleep(10);
MigrationManifest manifest = manifestBuilder.build(); MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs()); assertEquals(10L, manifest.durationMs());
assertEquals( assertEquals(
"13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", "13 records were generated in 10 ms across 1 batches. The average time spent waiting on a batch was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString() manifest.toString()
); );
} }
@ -79,7 +80,7 @@ public class MigrationManifestTest {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0) new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
)); ), 20_000_000);
manifestBuilder.acceptBatch(Arrays.asList( manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0), new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
@ -88,14 +89,16 @@ public class MigrationManifestTest {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0) new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
)); ), 20_000_000);
manifestBuilder.acceptBatch(Collections.singletonList( manifestBuilder.acceptBatch(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord(), (short) 0) new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
)); ), 5_000_000);
time.sleep(60);
MigrationManifest manifest = manifestBuilder.build(); MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs()); assertEquals(60L, manifest.durationMs());
assertEquals( assertEquals(
"13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", "13 records were generated in 60 ms across 3 batches. The average time spent waiting on a " +
"batch was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString() manifest.toString()
); );
} }