mirror of https://github.com/apache/kafka.git
KAFKA-15435 Fix counts in MigrationManifest (#14342)
Reviewers: Liu Zeyu <zeyu.luke@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
eb39c95080
commit
65e2ecffab
|
@ -718,14 +718,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
|||
// Ignore sending RPCs to the brokers since we're no longer in the state.
|
||||
if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
|
||||
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
|
||||
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
|
||||
log.info("Sending RPCs to broker before moving to dual-write mode using " +
|
||||
"at offset and epoch {}", image.highestOffsetAndEpoch());
|
||||
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
|
||||
// Migration leadership state doesn't change since we're not doing any Zk writes.
|
||||
transitionTo(MigrationDriverState.DUAL_WRITE);
|
||||
} else {
|
||||
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
|
||||
migrationLeadershipState.offsetAndEpoch());
|
||||
log.info("Not sending metadata RPCs with current metadata image since does not contain the offset " +
|
||||
"that was last written to ZK during the migration. Image offset {} is less than migration " +
|
||||
"leadership state offset {}", image.highestOffsetAndEpoch(), migrationLeadershipState.offsetAndEpoch());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -51,7 +52,7 @@ public class MigrationManifest {
|
|||
batches++;
|
||||
recordBatch.forEach(apiMessageAndVersion -> {
|
||||
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
|
||||
counts.merge(type, 1, (__, count) -> count + 1);
|
||||
counts.merge(type, 1, Integer::sum);
|
||||
total++;
|
||||
});
|
||||
}
|
||||
|
@ -60,7 +61,8 @@ public class MigrationManifest {
|
|||
if (endTimeNanos == 0) {
|
||||
endTimeNanos = time.nanoseconds();
|
||||
}
|
||||
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts);
|
||||
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
|
||||
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.metadata.migration;
|
||||
|
||||
import org.apache.kafka.common.metadata.ConfigRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
import org.apache.kafka.common.metadata.TopicRecord;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class MigrationManifestTest {
|
||||
@Test
|
||||
public void testEmpty() {
|
||||
Time time = new MockTime();
|
||||
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
|
||||
MigrationManifest manifest = manifestBuilder.build();
|
||||
assertEquals(0L, manifest.durationMs());
|
||||
assertEquals(
|
||||
"0 records were generated in 0 ms across 0 batches. The record types were {}",
|
||||
manifest.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneBatch() {
|
||||
Time time = new MockTime();
|
||||
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
|
||||
manifestBuilder.acceptBatch(Arrays.asList(
|
||||
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
|
||||
));
|
||||
MigrationManifest manifest = manifestBuilder.build();
|
||||
assertEquals(0L, manifest.durationMs());
|
||||
assertEquals(
|
||||
"13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
|
||||
manifest.toString()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManyBatch() {
|
||||
Time time = new MockTime();
|
||||
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
|
||||
manifestBuilder.acceptBatch(Arrays.asList(
|
||||
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
|
||||
));
|
||||
manifestBuilder.acceptBatch(Arrays.asList(
|
||||
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
|
||||
));
|
||||
manifestBuilder.acceptBatch(Collections.singletonList(
|
||||
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
|
||||
));
|
||||
MigrationManifest manifest = manifestBuilder.build();
|
||||
assertEquals(0L, manifest.durationMs());
|
||||
assertEquals(
|
||||
"13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
|
||||
manifest.toString()
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue