mirror of https://github.com/apache/kafka.git
MINOR: Various cleanups in metadata (#14734)
- Remove unused code, suppression - Simplify/fix test assertions - Javadoc cleanups Reviewers: Josep Prat <josep.prat@aiven.io>
This commit is contained in:
parent
49d3122d42
commit
832627fc78
|
@ -35,7 +35,7 @@ public enum BrokerRegistrationFencingChange {
|
|||
|
||||
private final static Map<Byte, BrokerRegistrationFencingChange> VALUE_TO_ENUM =
|
||||
Arrays.stream(BrokerRegistrationFencingChange.values()).
|
||||
collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
|
||||
collect(Collectors.toMap(v -> v.value(), Function.identity()));
|
||||
|
||||
public static Optional<BrokerRegistrationFencingChange> fromValue(byte value) {
|
||||
return Optional.ofNullable(VALUE_TO_ENUM.get(value));
|
||||
|
|
|
@ -36,7 +36,7 @@ public enum BrokerRegistrationInControlledShutdownChange {
|
|||
|
||||
private final static Map<Byte, BrokerRegistrationInControlledShutdownChange> VALUE_TO_ENUM =
|
||||
Arrays.stream(BrokerRegistrationInControlledShutdownChange.values()).
|
||||
collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
|
||||
collect(Collectors.toMap(v -> v.value(), Function.identity()));
|
||||
|
||||
public static Optional<BrokerRegistrationInControlledShutdownChange> fromValue(byte value) {
|
||||
return Optional.ofNullable(VALUE_TO_ENUM.get(value));
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.Objects;
|
|||
*/
|
||||
public final class DelegationTokenData {
|
||||
|
||||
private TokenInformation tokenInformation;
|
||||
private final TokenInformation tokenInformation;
|
||||
|
||||
public static DelegationTokenData fromRecord(DelegationTokenRecord record) {
|
||||
List<KafkaPrincipal> renewers = new ArrayList<>();
|
||||
|
@ -62,7 +62,7 @@ public final class DelegationTokenData {
|
|||
return new DelegationTokenRecord()
|
||||
.setOwner(tokenInformation.ownerAsString())
|
||||
.setRequester(tokenInformation.tokenRequesterAsString())
|
||||
.setRenewers(new ArrayList<String>(tokenInformation.renewersAsString()))
|
||||
.setRenewers(new ArrayList<>(tokenInformation.renewersAsString()))
|
||||
.setIssueTimestamp(tokenInformation.issueTimestamp())
|
||||
.setMaxTimestamp(tokenInformation.maxTimestamp())
|
||||
.setExpirationTimestamp(tokenInformation.expiryTimestamp())
|
||||
|
|
|
@ -56,7 +56,7 @@ public enum LeaderRecoveryState {
|
|||
|
||||
private final byte value;
|
||||
|
||||
private LeaderRecoveryState(byte value) {
|
||||
LeaderRecoveryState(byte value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
|
|
|
@ -358,8 +358,8 @@ public class PartitionRegistration {
|
|||
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
|
||||
record.setDirectories(Uuid.toList(directories));
|
||||
} else {
|
||||
for (int i = 0; i < directories.length; i++) {
|
||||
if (!DirectoryId.UNASSIGNED.equals(directories[i])) {
|
||||
for (Uuid directory : directories) {
|
||||
if (!DirectoryId.UNASSIGNED.equals(directory)) {
|
||||
options.handleLoss("the directory assignment state of one or more replicas");
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
|
|||
Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
|
||||
for (Endpoint endpoint : serverInfo.endpoints()) {
|
||||
if (serverInfo.earlyStartListeners().contains(
|
||||
endpoint.listenerName().orElseGet(() -> ""))) {
|
||||
endpoint.listenerName().orElse(""))) {
|
||||
result.put(endpoint, CompletableFuture.completedFuture(null));
|
||||
} else {
|
||||
result.put(endpoint, initialLoadFuture);
|
||||
|
|
|
@ -33,32 +33,30 @@ import org.apache.kafka.metadata.OptionalStringComparator;
|
|||
|
||||
/**
|
||||
* The striped replica placer.
|
||||
*
|
||||
*
|
||||
* GOALS
|
||||
* The design of this placer attempts to satisfy a few competing goals. Firstly, we want
|
||||
* <p>
|
||||
* <h3>Goals</h3>
|
||||
* <p>The design of this placer attempts to satisfy a few competing goals. Firstly, we want
|
||||
* to spread the replicas as evenly as we can across racks. In the simple case where
|
||||
* broker racks have not been configured, this goal is a no-op, of course. But it is the
|
||||
* highest priority goal in multi-rack clusters.
|
||||
*
|
||||
* Our second goal is to spread the replicas evenly across brokers. Since we are placing
|
||||
* <p>Our second goal is to spread the replicas evenly across brokers. Since we are placing
|
||||
* multiple partitions, we try to avoid putting each partition on the same set of
|
||||
* replicas, even if it does satisfy the rack placement goal. If any specific broker is
|
||||
* fenced, we would like the new leaders to distributed evenly across the remaining
|
||||
* brokers.
|
||||
*
|
||||
* However, we treat the rack placement goal as higher priority than this goal-- if you
|
||||
* <p>However, we treat the rack placement goal as higher priority than this goal-- if you
|
||||
* configure 10 brokers in rack A and B, and 1 broker in rack C, you will end up with a
|
||||
* lot of partitions on that one broker in rack C. If you were to place a lot of
|
||||
* partitions with replication factor 3, each partition would try to get a replica there.
|
||||
* In general racks are supposed to be about the same size -- if they aren't, this is a
|
||||
* user error.
|
||||
*
|
||||
* Finally, we would prefer to place replicas on unfenced brokers, rather than on fenced
|
||||
* <p>Finally, we would prefer to place replicas on unfenced brokers, rather than on fenced
|
||||
* brokers.
|
||||
*
|
||||
*
|
||||
* CONSTRAINTS
|
||||
* <p>
|
||||
* <h3>Constraints</h3>
|
||||
* In addition to these goals, we have two constraints. Unlike the goals, these are not
|
||||
* optional -- they are mandatory. Placement will fail if a constraint cannot be
|
||||
* satisfied. The first constraint is that we can't place more than one replica on the
|
||||
|
@ -66,55 +64,54 @@ import org.apache.kafka.metadata.OptionalStringComparator;
|
|||
* cluster can't have any topics with replication factor 4. This constraint comes from
|
||||
* Kafka's internal design.
|
||||
*
|
||||
* The second constraint is that the leader of each partition must be an unfenced broker.
|
||||
* <p>The second constraint is that the leader of each partition must be an unfenced broker.
|
||||
* This constraint is a bit arbitrary. In theory, we could allow people to create
|
||||
* new topics even if every broker were fenced. However, this would be confusing for
|
||||
* users.
|
||||
*
|
||||
*
|
||||
* ALGORITHM
|
||||
* The StripedReplicaPlacer constructor loads the broker data into rack objects. Each
|
||||
* <p>
|
||||
* <h3>Algorithm</h3>
|
||||
* <p>The StripedReplicaPlacer constructor loads the broker data into rack objects. Each
|
||||
* rack object contains a sorted list of fenced brokers, and a separate sorted list of
|
||||
* unfenced brokers. The racks themselves are organized into a sorted list, stored inside
|
||||
* the top-level RackList object.
|
||||
*
|
||||
* The general idea is that we place replicas on to racks in a round-robin fashion. So if
|
||||
* <p>The general idea is that we place replicas on to racks in a round-robin fashion. So if
|
||||
* we had racks A, B, C, and D, and we were creating a new partition with replication
|
||||
* factor 3, our first replica might come from A, our second from B, and our third from C.
|
||||
* Of course our placement would not be very fair if we always started with rack A.
|
||||
* Therefore, we generate a random starting offset when the RackList is created. So one
|
||||
* time we might go B, C, D. Another time we might go C, D, A. And so forth.
|
||||
*
|
||||
* Note that each partition we generate advances the starting offset by one.
|
||||
* <p>Note that each partition we generate advances the starting offset by one.
|
||||
* So in our 4-rack cluster, with 3 partitions, we might choose these racks:
|
||||
*
|
||||
* <pre>
|
||||
* partition 1: A, B, C
|
||||
* partition 2: B, C, A
|
||||
* partition 3: C, A, B
|
||||
*
|
||||
* </pre>
|
||||
* This is what generates the characteristic "striped" pattern of this placer.
|
||||
*
|
||||
* So far I haven't said anything about how we choose a replica from within a rack. In
|
||||
* <p>So far I haven't said anything about how we choose a replica from within a rack. In
|
||||
* fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2,
|
||||
* and A3, we might return A0 the first time, A1, the second, A2 the third, and so on.
|
||||
* Just like with the racks, we add a random starting offset to mix things up a bit.
|
||||
*
|
||||
* So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
|
||||
* <p>So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
|
||||
* for 9 nodes in total.
|
||||
* If all the offsets were 0, you'd get placements like this:
|
||||
*
|
||||
* <pre>
|
||||
* partition 1: A0, B0, C0
|
||||
* partition 2: B1, C1, A1
|
||||
* partition 3: C2, A2, B2
|
||||
*
|
||||
* One additional complication with choosing a replica within a rack is that we want to
|
||||
* </pre>
|
||||
* <p>One additional complication with choosing a replica within a rack is that we want to
|
||||
* choose the unfenced replicas first. In a big cluster with lots of nodes available,
|
||||
* we'd prefer not to place a new partition on a node that is fenced. Therefore, we
|
||||
* actually maintain two lists, rather than the single list I described above.
|
||||
* We only start using the fenced node list when the unfenced node list is totally
|
||||
* exhausted.
|
||||
*
|
||||
* Furthermore, we cannot place the first replica (the leader) of a new partition on a
|
||||
* <p>Furthermore, we cannot place the first replica (the leader) of a new partition on a
|
||||
* fenced replica. Therefore, we have some special logic to ensure that this doesn't
|
||||
* happen.
|
||||
*/
|
||||
|
@ -272,7 +269,6 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
|
|||
|
||||
/**
|
||||
* The names of all the racks in the cluster.
|
||||
*
|
||||
* Racks which have at least one unfenced broker come first (in sorted order),
|
||||
* followed by racks which have only fenced brokers (also in sorted order).
|
||||
*/
|
||||
|
|
|
@ -113,8 +113,7 @@ public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndT
|
|||
|
||||
private BatchAndType nextControlBatch(FileChannelRecordBatch input) {
|
||||
List<ApiMessageAndVersion> messages = new ArrayList<>();
|
||||
for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) {
|
||||
Record record = iter.next();
|
||||
for (Record record : input) {
|
||||
try {
|
||||
short typeId = ControlRecordType.parseTypeId(record.key());
|
||||
ControlRecordType type = ControlRecordType.fromTypeId(typeId);
|
||||
|
@ -179,6 +178,6 @@ public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndT
|
|||
} catch (Exception e) {
|
||||
log.error("Error closing fileRecords", e);
|
||||
}
|
||||
this.batchIterator = Collections.<FileChannelRecordBatch>emptyList().iterator();
|
||||
this.batchIterator = Collections.emptyIterator();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public final class RecordRedactor {
|
|||
case USER_SCRAM_CREDENTIAL_RECORD: {
|
||||
UserScramCredentialRecord record = (UserScramCredentialRecord) message;
|
||||
return "UserScramCredentialRecord("
|
||||
+ "name=" + ((record.name() == null) ? "null" : "'" + record.name().toString() + "'")
|
||||
+ "name=" + ((record.name() == null) ? "null" : "'" + record.name() + "'")
|
||||
+ ", mechanism=" + record.mechanism()
|
||||
+ ", salt=(redacted)"
|
||||
+ ", storedKey=(redacted)"
|
||||
|
|
|
@ -240,7 +240,7 @@ public class ClusterControlManagerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRegistrationWithIncorrectClusterId() throws Exception {
|
||||
public void testRegistrationWithIncorrectClusterId() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
FeatureControlManager featureControl = new FeatureControlManager.Builder().
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
|
@ -317,7 +317,7 @@ public class ClusterControlManagerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUnregister() throws Exception {
|
||||
public void testUnregister() {
|
||||
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
|
||||
setBrokerId(1).
|
||||
setBrokerEpoch(100).
|
||||
|
@ -365,7 +365,7 @@ public class ClusterControlManagerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = {3, 10})
|
||||
public void testPlaceReplicas(int numUsableBrokers) throws Exception {
|
||||
public void testPlaceReplicas(int numUsableBrokers) {
|
||||
MockTime time = new MockTime(0, 0, 0);
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
FeatureControlManager featureControl = new FeatureControlManager.Builder().
|
||||
|
@ -418,7 +418,7 @@ public class ClusterControlManagerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
|
||||
public void testRegistrationsToRecords(MetadataVersion metadataVersion) throws Exception {
|
||||
public void testRegistrationsToRecords(MetadataVersion metadataVersion) {
|
||||
MockTime time = new MockTime(0, 0, 0);
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
FeatureControlManager featureControl = new FeatureControlManager.Builder().
|
||||
|
|
|
@ -122,8 +122,6 @@ public class ProducerIdControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testUnknownBrokerOrEpoch() {
|
||||
ControllerResult<ProducerIdsBlock> result;
|
||||
|
||||
assertThrows(StaleBrokerEpochException.class, () ->
|
||||
producerIdControlManager.generateNextProducerId(99, 0));
|
||||
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -46,13 +44,13 @@ import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.f
|
|||
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
|
||||
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
||||
@Timeout(value = 40)
|
||||
public class QuorumControllerMetricsIntegrationTest {
|
||||
private final static Logger log = LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
|
||||
|
||||
static class MockControllerMetrics extends QuorumControllerMetrics {
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
@ -179,7 +177,7 @@ public class QuorumControllerMetricsIntegrationTest {
|
|||
for (QuorumController controller : controlEnv.controllers()) {
|
||||
// Inactive controllers don't set these metrics.
|
||||
if (!controller.isActive()) {
|
||||
assertEquals(false, controller.controllerMetrics().active());
|
||||
assertFalse(controller.controllerMetrics().active());
|
||||
assertEquals(0L, controller.controllerMetrics().timedOutHeartbeats());
|
||||
assertEquals(0L, controller.controllerMetrics().operationsTimedOut());
|
||||
}
|
||||
|
|
|
@ -1102,8 +1102,8 @@ public class QuorumControllerTest {
|
|||
}
|
||||
|
||||
static class InitialSnapshot implements AutoCloseable {
|
||||
File tempDir = null;
|
||||
BatchFileWriter writer = null;
|
||||
File tempDir;
|
||||
BatchFileWriter writer;
|
||||
|
||||
public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
|
||||
tempDir = TestUtils.tempDirectory();
|
||||
|
@ -1292,7 +1292,7 @@ public class QuorumControllerTest {
|
|||
controllerBuilder.setZkMigrationEnabled(migrationEnabled);
|
||||
}).
|
||||
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
|
||||
build();
|
||||
build()
|
||||
) {
|
||||
QuorumController active = controlEnv.activeController();
|
||||
ZkMigrationState zkMigrationState = active.appendReadEvent("read migration state", OptionalLong.empty(),
|
||||
|
@ -1317,7 +1317,7 @@ public class QuorumControllerTest {
|
|||
controllerBuilder.setZkMigrationEnabled(true);
|
||||
}).
|
||||
setBootstrapMetadata(bootstrapMetadata).
|
||||
build();
|
||||
build()
|
||||
) {
|
||||
QuorumController active = controlEnv.activeController();
|
||||
assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION);
|
||||
|
@ -1453,7 +1453,7 @@ public class QuorumControllerTest {
|
|||
@Test
|
||||
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
|
||||
try (
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()
|
||||
) {
|
||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
||||
setControllerBuilderInitializer(controllerBuilder -> {
|
||||
|
@ -1571,7 +1571,7 @@ public class QuorumControllerTest {
|
|||
@Test
|
||||
public void testFailoverDuringMigrationTransaction() throws Exception {
|
||||
try (
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()
|
||||
) {
|
||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
||||
setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setZkMigrationEnabled(true)).
|
||||
|
@ -1616,7 +1616,7 @@ public class QuorumControllerTest {
|
|||
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
|
||||
public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
|
||||
try (
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()
|
||||
) {
|
||||
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
|
||||
setControllerBuilderInitializer(controllerBuilder ->
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.controller;
|
||||
|
||||
import org.apache.kafka.clients.ApiVersions;
|
||||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
|
||||
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
|
||||
import org.apache.kafka.raft.LeaderAndEpoch;
|
||||
|
@ -98,7 +97,6 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
int numControllers = logEnv.logManagers().size();
|
||||
this.controllers = new ArrayList<>(numControllers);
|
||||
try {
|
||||
ApiVersions apiVersions = new ApiVersions();
|
||||
List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
|
||||
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
|
||||
QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
|
||||
|
|
|
@ -921,8 +921,8 @@ public class ReplicationControlManagerTest {
|
|||
shrinkIsrResult, topicIdPartition, NONE);
|
||||
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
|
||||
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
|
||||
PartitionData expandIsrRequest = newAlterPartition(
|
||||
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
|
||||
|
@ -932,8 +932,8 @@ public class ReplicationControlManagerTest {
|
|||
expandIsrResult, topicIdPartition, NONE);
|
||||
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
|
||||
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -952,19 +952,19 @@ public class ReplicationControlManagerTest {
|
|||
ctx.fenceBrokers(Utils.mkSet(2, 3));
|
||||
|
||||
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{3}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
|
||||
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
|
||||
|
||||
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{1, 3}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
|
||||
ctx.unfenceBrokers(0, 1, 2, 3);
|
||||
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{1, 3}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1000,16 +1000,16 @@ public class ReplicationControlManagerTest {
|
|||
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
|
||||
|
||||
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{2, 3}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{2, 3}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
|
||||
ctx.unfenceBrokers(2);
|
||||
ctx.fenceBrokers(Utils.mkSet(0, 1));
|
||||
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
|
||||
assertTrue(Arrays.equals(new int[]{0, 3}, partition.elr), partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{2}, partition.isr), partition.toString());
|
||||
assertArrayEquals(new int[]{0, 3}, partition.elr, partition.toString());
|
||||
assertArrayEquals(new int[]{2}, partition.isr, partition.toString());
|
||||
assertEquals(2, partition.leader, partition.toString());
|
||||
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
|
||||
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -1152,7 +1152,7 @@ public class ReplicationControlManagerTest {
|
|||
long brokerEpoch,
|
||||
Uuid topicId,
|
||||
AlterPartitionRequestData.PartitionData partitionData
|
||||
) throws Exception {
|
||||
) {
|
||||
AlterPartitionRequestData request = new AlterPartitionRequestData()
|
||||
.setBrokerId(brokerId)
|
||||
.setBrokerEpoch(brokerEpoch);
|
||||
|
@ -1424,7 +1424,6 @@ public class ReplicationControlManagerTest {
|
|||
anonymousContextFor(ApiKeys.CREATE_TOPICS);
|
||||
ControllerResult<CreateTopicsResponseData> createResult =
|
||||
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
|
||||
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
|
||||
CreatableTopicResult createdTopic = createResult.response().topics().find("foo");
|
||||
assertEquals(NONE.code(), createdTopic.errorCode());
|
||||
ctx.replay(createResult.records());
|
||||
|
|
|
@ -37,7 +37,7 @@ import java.util.Optional;
|
|||
@Timeout(40)
|
||||
public class AclsDeltaTest {
|
||||
|
||||
private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
|
||||
private final Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
|
||||
|
||||
@Test
|
||||
public void testRemovesDeleteIfNotInImage() {
|
||||
|
|
|
@ -37,8 +37,6 @@ import org.apache.kafka.metadata.VersionRange;
|
|||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -55,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
|
||||
@Timeout(value = 40)
|
||||
public class ClusterImageTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(ClusterImageTest.class);
|
||||
|
||||
public final static ClusterImage IMAGE1;
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ public class DelegationTokenImageTest {
|
|||
tokenId,
|
||||
SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"),
|
||||
SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"),
|
||||
new ArrayList<KafkaPrincipal>(),
|
||||
new ArrayList<>(),
|
||||
0,
|
||||
1000,
|
||||
expireTimestamp);
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.List;
|
|||
|
||||
public class FakeSnapshotWriter implements SnapshotWriter<ApiMessageAndVersion> {
|
||||
private final OffsetAndEpoch snapshotId;
|
||||
private List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
|
||||
private final List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
|
||||
private boolean frozen = false;
|
||||
private boolean closed = false;
|
||||
|
||||
|
@ -79,7 +79,7 @@ public class FakeSnapshotWriter implements SnapshotWriter<ApiMessageAndVersion>
|
|||
@Override
|
||||
public long freeze() {
|
||||
frozen = true;
|
||||
return batches.size() * 100;
|
||||
return batches.size() * 100L;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -103,7 +103,7 @@ public class ImageDowngradeTest {
|
|||
* Test downgrading to a MetadataVersion that doesn't support inControlledShutdown.
|
||||
*/
|
||||
@Test
|
||||
public void testPreControlledShutdownStateVersion() throws Throwable {
|
||||
public void testPreControlledShutdownStateVersion() {
|
||||
writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV2,
|
||||
Arrays.asList(
|
||||
"the inControlledShutdown state of one or more brokers"),
|
||||
|
|
|
@ -118,10 +118,6 @@ public class MetadataImageTest {
|
|||
.build(), Optional.empty());
|
||||
}
|
||||
|
||||
private static void testToImage(MetadataImage image, ImageWriterOptions options) {
|
||||
testToImage(image, options, Optional.empty());
|
||||
}
|
||||
|
||||
static void testToImage(MetadataImage image, ImageWriterOptions options, Optional<List<ApiMessageAndVersion>> fromRecords) {
|
||||
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image, options)));
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.kafka.image;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
|
||||
|
@ -31,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
@Timeout(value = 40)
|
||||
public class MetadataVersionChangeTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(MetadataVersionChangeTest.class);
|
||||
|
||||
private final static MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
|
||||
new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
|
||||
|
@ -40,19 +37,19 @@ public class MetadataVersionChangeTest {
|
|||
new MetadataVersionChange(IBP_3_3_IV0, IBP_3_0_IV1);
|
||||
|
||||
@Test
|
||||
public void testIsUpgrade() throws Throwable {
|
||||
public void testIsUpgrade() {
|
||||
assertTrue(CHANGE_3_0_IV1_TO_3_3_IV0.isUpgrade());
|
||||
assertFalse(CHANGE_3_3_IV0_TO_3_0_IV1.isUpgrade());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsDowngrade() throws Throwable {
|
||||
public void testIsDowngrade() {
|
||||
assertFalse(CHANGE_3_0_IV1_TO_3_3_IV0.isDowngrade());
|
||||
assertTrue(CHANGE_3_3_IV0_TO_3_0_IV1.isDowngrade());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataVersionChangeExceptionToString() throws Throwable {
|
||||
public void testMetadataVersionChangeExceptionToString() {
|
||||
assertEquals("org.apache.kafka.image.MetadataVersionChangeException: The metadata " +
|
||||
"version is changing from 3.0-IV1 to 3.3-IV0",
|
||||
new MetadataVersionChangeException(CHANGE_3_0_IV1_TO_3_3_IV0).toString());
|
||||
|
|
|
@ -45,10 +45,6 @@ public class MetadataLoaderMetricsTest {
|
|||
new AtomicReference<>(MetadataProvenance.EMPTY);
|
||||
final MetadataLoaderMetrics metrics;
|
||||
|
||||
FakeMetadataLoaderMetrics() {
|
||||
this(Optional.empty());
|
||||
}
|
||||
|
||||
FakeMetadataLoaderMetrics(MetricsRegistry registry) {
|
||||
this(Optional.of(registry));
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ public class SnapshotEmitterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testEmit() throws Exception {
|
||||
public void testEmit() {
|
||||
MockRaftClient mockRaftClient = new MockRaftClient();
|
||||
MockTime time = new MockTime(0, 10000L, 20000L);
|
||||
SnapshotEmitter emitter = new SnapshotEmitter.Builder().
|
||||
|
|
|
@ -21,12 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Timeout(value = 40)
|
||||
public class BrokerStateTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(BrokerStateTest.class);
|
||||
|
||||
@Test
|
||||
public void testFromValue() {
|
||||
|
|
|
@ -116,14 +116,14 @@ public class ListenerInfoTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRequestFailsOnNullHost() throws Exception {
|
||||
public void testToControllerRegistrationRequestFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
toControllerRegistrationRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRequestFailsOnZeroPort() throws Exception {
|
||||
public void testToControllerRegistrationRequestFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
|
@ -141,14 +141,14 @@ public class ListenerInfoTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRecordFailsOnNullHost() throws Exception {
|
||||
public void testToControllerRegistrationRecordFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
toControllerRegistrationRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRecordFailsOnZeroPort() throws Exception {
|
||||
public void testToControllerRegistrationRecordFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
|
@ -166,14 +166,14 @@ public class ListenerInfoTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRequestFailsOnNullHost() throws Exception {
|
||||
public void testToBrokerRegistrationRequestFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
toBrokerRegistrationRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRequestFailsOnZeroPort() throws Exception {
|
||||
public void testToBrokerRegistrationRequestFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
|
@ -191,14 +191,14 @@ public class ListenerInfoTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRecordFailsOnNullHost() throws Exception {
|
||||
public void testToBrokerRegistrationRecordFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
toBrokerRegistrationRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRecordFailsOnZeroPort() throws Exception {
|
||||
public void testToBrokerRegistrationRecordFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
|
|
|
@ -280,8 +280,8 @@ public class PartitionRegistrationTest {
|
|||
PartitionRecord expectRecord = new PartitionRecord().
|
||||
setTopicId(topicID).
|
||||
setPartitionId(0).
|
||||
setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
|
||||
setIsr(Arrays.asList(new Integer[]{0, 1})).
|
||||
setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
|
||||
setIsr(Arrays.asList(0, 1)).
|
||||
setLeader(0).
|
||||
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
|
||||
setLeaderEpoch(0).
|
||||
|
@ -290,8 +290,8 @@ public class PartitionRegistrationTest {
|
|||
when(metadataVersion.partitionRecordVersion()).thenReturn(version);
|
||||
if (version > 0) {
|
||||
expectRecord.
|
||||
setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
|
||||
setLastKnownELR(Arrays.asList(new Integer[]{4}));
|
||||
setEligibleLeaderReplicas(Arrays.asList(2, 3)).
|
||||
setLastKnownELR(Arrays.asList(4));
|
||||
} else {
|
||||
when(metadataVersion.isElrSupported()).thenReturn(false);
|
||||
}
|
||||
|
@ -318,6 +318,7 @@ public class PartitionRegistrationTest {
|
|||
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
|
||||
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
|
||||
setReplicas(new int[]{0, 1, 2, 3, 4}).
|
||||
|
@ -331,8 +332,8 @@ public class PartitionRegistrationTest {
|
|||
PartitionRecord expectRecord = new PartitionRecord().
|
||||
setTopicId(topicID).
|
||||
setPartitionId(0).
|
||||
setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
|
||||
setIsr(Arrays.asList(new Integer[]{0, 1})).
|
||||
setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
|
||||
setIsr(Arrays.asList(0, 1)).
|
||||
setLeader(0).
|
||||
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
|
||||
setLeaderEpoch(0).
|
||||
|
@ -342,8 +343,9 @@ public class PartitionRegistrationTest {
|
|||
setMetadataVersion(MetadataVersion.latest()).
|
||||
setLossHandler(exceptions::add).
|
||||
build();
|
||||
assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options));
|
||||
assertEquals(new ApiMessageAndVersion(expectRecord, (short) 1), partitionRegistration.toRecord(topicID, 0, options));
|
||||
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
|
||||
assertTrue(exceptions.isEmpty());
|
||||
}
|
||||
|
||||
@Property
|
||||
|
|
|
@ -273,7 +273,6 @@ public class RecordTestUtils {
|
|||
*
|
||||
* @param o The input object. It will be modified in-place.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void deepSortRecords(Object o) throws Exception {
|
||||
if (o == null) {
|
||||
return;
|
||||
|
|
|
@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
@Timeout(value = 40)
|
||||
public class VersionRangeTest {
|
||||
@SuppressWarnings("unchecked")
|
||||
private static VersionRange v(int a, int b) {
|
||||
assertTrue(a <= Short.MAX_VALUE);
|
||||
assertTrue(a >= Short.MIN_VALUE);
|
||||
|
|
|
@ -48,7 +48,7 @@ public class BootstrapDirectoryTest {
|
|||
static class BootstrapTestDirectory implements AutoCloseable {
|
||||
File directory = null;
|
||||
|
||||
synchronized BootstrapTestDirectory createDirectory() throws Exception {
|
||||
synchronized BootstrapTestDirectory createDirectory() {
|
||||
directory = TestUtils.tempDirectory("BootstrapTestDirectory");
|
||||
return this;
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ public class BootstrapDirectoryTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMissingDirectory() throws Exception {
|
||||
public void testMissingDirectory() {
|
||||
assertEquals("No such directory as ./non/existent/directory",
|
||||
assertThrows(RuntimeException.class, () ->
|
||||
new BootstrapDirectory("./non/existent/directory", Optional.empty()).read()).getMessage());
|
||||
|
|
|
@ -48,7 +48,7 @@ public class BootstrapMetadataTest {
|
|||
setFeatureLevel((short) 6), (short) 0)));
|
||||
|
||||
@Test
|
||||
public void testFromVersion() throws Exception {
|
||||
public void testFromVersion() {
|
||||
assertEquals(new BootstrapMetadata(Collections.singletonList(
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(FEATURE_NAME).
|
||||
|
@ -58,20 +58,20 @@ public class BootstrapMetadataTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFromRecordsList() throws Exception {
|
||||
public void testFromRecordsList() {
|
||||
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV2, "bar"),
|
||||
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromRecordsListWithoutMetadataVersion() throws Exception {
|
||||
public void testFromRecordsListWithoutMetadataVersion() {
|
||||
assertEquals("No FeatureLevelRecord for metadata.version was found in the bootstrap " +
|
||||
"metadata from quux", assertThrows(RuntimeException.class,
|
||||
() -> BootstrapMetadata.fromRecords(emptyList(), "quux")).getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyWithOnlyVersion() throws Exception {
|
||||
public void testCopyWithOnlyVersion() {
|
||||
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), IBP_3_3_IV2, "baz"),
|
||||
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "baz").copyWithOnlyVersion());
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class BootstrapMetadataTest {
|
|||
setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)));
|
||||
|
||||
@Test
|
||||
public void testFromRecordsListWithOldMetadataVersion() throws Exception {
|
||||
public void testFromRecordsListWithOldMetadataVersion() {
|
||||
RuntimeException exception = assertThrows(RuntimeException.class,
|
||||
() -> BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
|
||||
assertEquals("Bootstrap metadata versions before 3.3-IV0 are not supported. Can't load " +
|
||||
|
|
|
@ -36,7 +36,7 @@ public class CapturingDelegationTokenMigrationClient implements DelegationTokenM
|
|||
|
||||
@Override
|
||||
public List<String> getDelegationTokens() {
|
||||
return new ArrayList<String>();
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -102,7 +102,7 @@ final public class BatchFileWriterReaderTest {
|
|||
assertEquals(0, apiMessageAndVersion.version());
|
||||
|
||||
SnapshotFooterRecord footerRecord = (SnapshotFooterRecord) apiMessageAndVersion.message();
|
||||
assertEquals(0, headerRecord.version());
|
||||
assertEquals(0, footerRecord.version());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,6 +176,11 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
*/
|
||||
private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
|
||||
|
||||
/**
|
||||
* Maps committed offset to snapshot reader.
|
||||
*/
|
||||
private final NavigableMap<Long, RawSnapshotReader> snapshots = new TreeMap<>();
|
||||
|
||||
/**
|
||||
* The current leader.
|
||||
*/
|
||||
|
@ -192,11 +197,6 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
*/
|
||||
private long initialMaxReadOffset = Long.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Maps committed offset to snapshot reader.
|
||||
*/
|
||||
private NavigableMap<Long, RawSnapshotReader> snapshots = new TreeMap<>();
|
||||
|
||||
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
|
||||
if (snapshot.isPresent()) {
|
||||
RawSnapshotReader initialSnapshot = snapshot.get();
|
||||
|
@ -515,7 +515,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
* result is half the records getting appended with leader election following that.
|
||||
* This is done to emulate having some of the records not getting committed.
|
||||
*/
|
||||
private AtomicBoolean resignAfterNonAtomicCommit = new AtomicBoolean(false);
|
||||
private final AtomicBoolean resignAfterNonAtomicCommit = new AtomicBoolean(false);
|
||||
|
||||
public LocalLogManager(LogContext logContext,
|
||||
int nodeId,
|
||||
|
@ -827,7 +827,6 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
// the leader epoch has already advanced. resign is a no op.
|
||||
log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " +
|
||||
"smaller than the current epoch {}", epoch, currentEpoch);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ public class LocalLogManagerTest {
|
|||
long highestOffset = -1;
|
||||
for (String event : listener.serializedEvents()) {
|
||||
if (event.startsWith(LAST_COMMITTED_OFFSET)) {
|
||||
long offset = Long.valueOf(
|
||||
long offset = Long.parseLong(
|
||||
event.substring(LAST_COMMITTED_OFFSET.length() + 1));
|
||||
if (offset < highestOffset) {
|
||||
throw new RuntimeException("Invalid offset: " + offset +
|
||||
|
|
|
@ -54,13 +54,9 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
|
|||
|
||||
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
|
||||
ApiMessage message = messageAndVersion.message();
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(COMMIT).append(" ").append(message.toString());
|
||||
serializedEvents.add(bld.toString());
|
||||
serializedEvents.add(COMMIT + " " + message.toString());
|
||||
}
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
|
||||
serializedEvents.add(bld.toString());
|
||||
serializedEvents.add(LAST_COMMITTED_OFFSET + " " + lastCommittedOffset);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
|
@ -76,13 +72,9 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
|
|||
|
||||
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
|
||||
ApiMessage message = messageAndVersion.message();
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(SNAPSHOT).append(" ").append(message.toString());
|
||||
serializedEvents.add(bld.toString());
|
||||
serializedEvents.add(SNAPSHOT + " " + message.toString());
|
||||
}
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
|
||||
serializedEvents.add(bld.toString());
|
||||
serializedEvents.add(LAST_COMMITTED_OFFSET + " " + lastCommittedOffset);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
|
@ -95,14 +87,10 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
|
|||
this.leaderAndEpoch = newLeaderAndEpoch;
|
||||
|
||||
if (newLeaderAndEpoch.isLeader(nodeId)) {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(NEW_LEADER).append(" ").
|
||||
append(nodeId).append(" ").append(newLeaderAndEpoch.epoch());
|
||||
serializedEvents.add(bld.toString());
|
||||
String bld = NEW_LEADER + " " + nodeId + " " + newLeaderAndEpoch.epoch();
|
||||
serializedEvents.add(bld);
|
||||
} else if (oldLeaderAndEpoch.isLeader(nodeId)) {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch());
|
||||
serializedEvents.add(bld.toString());
|
||||
serializedEvents.add(RENOUNCE + " " + newLeaderAndEpoch.epoch());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue