MINOR: Various cleanups in raft (#15805)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-04-26 15:20:09 +02:00 committed by GitHub
parent 2db87f04b8
commit e7792258df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 125 additions and 155 deletions

View File

@ -93,7 +93,7 @@ public class FileBasedStateStore implements QuorumStateStore {
} }
if (dataVersionNode.asInt() != 0) { if (dataVersionNode.asInt() != 0) {
throw new UnsupportedVersionException("Unknown data version of " + dataVersionNode.toString()); throw new UnsupportedVersionException("Unknown data version of " + dataVersionNode);
} }
final short dataVersion = dataVersionNode.shortValue(); final short dataVersion = dataVersionNode.shortValue();

View File

@ -1881,12 +1881,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
clusterId, clusterId,
quorum().localIdOrSentinel(), quorum().localIdOrSentinel(),
log.topicPartition(), log.topicPartition(),
snapshotPartition -> { snapshotPartition -> snapshotPartition
return snapshotPartition .setCurrentLeaderEpoch(quorum.epoch())
.setCurrentLeaderEpoch(quorum.epoch()) .setSnapshotId(requestSnapshotId)
.setSnapshotId(requestSnapshotId) .setPosition(snapshotSize)
.setPosition(snapshotSize);
}
); );
return request.setReplicaId(quorum.localIdOrSentinel()); return request.setReplicaId(quorum.localIdOrSentinel());

View File

@ -264,7 +264,7 @@ public class QuorumConfig {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> voterStrings = (List) value; List<String> voterStrings = (List<String>) value;
// Attempt to parse the connect strings // Attempt to parse the connect strings
parseVoterConnections(voterStrings); parseVoterConnections(voterStrings);

View File

@ -130,15 +130,9 @@ public class FileBasedStateStoreTest {
// We initialized a state from the metadata log // We initialized a state from the metadata log
assertTrue(stateFile.exists()); assertTrue(stateFile.exists());
final int epoch = 3012;
final int leaderId = 9990;
final int follower1 = leaderId + 1;
final int follower2 = follower1 + 1;
Set<Integer> voters = Utils.mkSet(leaderId, follower1, follower2);
writeToStateFile(stateFile, jsonString); writeToStateFile(stateFile, jsonString);
assertThrows(UnsupportedVersionException.class, () -> { assertThrows(UnsupportedVersionException.class, () -> stateStore.readElectionState());
stateStore.readElectionState(); });
stateStore.clear(); stateStore.clear();
assertFalse(stateFile.exists()); assertFalse(stateFile.exists());

View File

@ -83,7 +83,6 @@ public class KafkaNetworkChannelTest {
ApiKeys.FETCH ApiKeys.FETCH
); );
private final String clusterId = "clusterId";
private final int requestTimeoutMs = 30000; private final int requestTimeoutMs = 30000;
private final Time time = new MockTime(); private final Time time = new MockTime();
private final MockClient client = new MockClient(time, new StubMetadataUpdater()); private final MockClient client = new MockClient(time, new StubMetadataUpdater());
@ -220,11 +219,11 @@ public class KafkaNetworkChannelTest {
AbstractRequest request = client.requests().peek().requestBuilder().build(version); AbstractRequest request = client.requests().peek().requestBuilder().build(version);
if (version < 15) { if (version < 15) {
assertTrue(((FetchRequest) request).data().replicaId() == 1); assertEquals(1, ((FetchRequest) request).data().replicaId());
assertTrue(((FetchRequest) request).data().replicaState().replicaId() == -1); assertEquals(-1, ((FetchRequest) request).data().replicaState().replicaId());
} else { } else {
assertTrue(((FetchRequest) request).data().replicaId() == -1); assertEquals(-1, ((FetchRequest) request).data().replicaId());
assertTrue(((FetchRequest) request).data().replicaState().replicaId() == 1); assertEquals(1, ((FetchRequest) request).data().replicaState().replicaId());
} }
} }
@ -256,6 +255,7 @@ public class KafkaNetworkChannelTest {
private ApiMessage buildTestRequest(ApiKeys key) { private ApiMessage buildTestRequest(ApiKeys key) {
int leaderEpoch = 5; int leaderEpoch = 5;
int leaderId = 1; int leaderId = 1;
String clusterId = "clusterId";
switch (key) { switch (key) {
case BEGIN_QUORUM_EPOCH: case BEGIN_QUORUM_EPOCH:
return BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId); return BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId);

View File

@ -1873,12 +1873,10 @@ final public class KafkaRaftClientSnapshotTest {
clusterId, clusterId,
replicaId, replicaId,
topicPartition, topicPartition,
snapshotPartition -> { snapshotPartition -> snapshotPartition
return snapshotPartition .setCurrentLeaderEpoch(epoch)
.setCurrentLeaderEpoch(epoch) .setSnapshotId(snapshotId)
.setSnapshotId(snapshotId) .setPosition(position)
.setPosition(position);
}
); );
return request.setMaxBytes(maxBytes); return request.setMaxBytes(maxBytes);
@ -1963,8 +1961,8 @@ final public class KafkaRaftClientSnapshotTest {
private final static class MemorySnapshotWriter implements RawSnapshotWriter { private final static class MemorySnapshotWriter implements RawSnapshotWriter {
private final OffsetAndEpoch snapshotId; private final OffsetAndEpoch snapshotId;
private final AtomicLong frozenPosition;
private ByteBuffer data; private ByteBuffer data;
private AtomicLong frozenPosition;
public MemorySnapshotWriter(OffsetAndEpoch snapshotId) { public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
this.snapshotId = snapshotId; this.snapshotId = snapshotId;

View File

@ -571,7 +571,7 @@ public class LeaderStateTest {
} }
private DescribeQuorumResponseData.ReplicaState describeVoterState( private DescribeQuorumResponseData.ReplicaState describeVoterState(
LeaderState state, LeaderState<?> state,
int voterId, int voterId,
long currentTimeMs long currentTimeMs
) { ) {
@ -580,7 +580,7 @@ public class LeaderStateTest {
} }
private DescribeQuorumResponseData.ReplicaState describeObserverState( private DescribeQuorumResponseData.ReplicaState describeObserverState(
LeaderState state, LeaderState<?> state,
int observerId, int observerId,
long currentTimeMs long currentTimeMs
) { ) {

View File

@ -35,7 +35,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -217,7 +216,7 @@ public class MockLogTest {
} }
@Test @Test
public void testAppendAsFollower() throws IOException { public void testAppendAsFollower() {
final long initialOffset = 5; final long initialOffset = 5;
final int epoch = 3; final int epoch = 3;
SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
@ -332,8 +331,6 @@ public class MockLogTest {
LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED); LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
assertEquals(5L, readInfo.startOffsetMetadata.offset); assertEquals(5L, readInfo.startOffsetMetadata.offset);
assertTrue(readInfo.startOffsetMetadata.metadata.isPresent()); assertTrue(readInfo.startOffsetMetadata.metadata.isPresent());
MockLog.MockOffsetMetadata offsetMetadata = (MockLog.MockOffsetMetadata)
readInfo.startOffsetMetadata.metadata.get();
// Update to a high watermark with valid offset metadata // Update to a high watermark with valid offset metadata
log.updateHighWatermark(readInfo.startOffsetMetadata); log.updateHighWatermark(readInfo.startOffsetMetadata);
@ -381,7 +378,7 @@ public class MockLogTest {
} }
@Test @Test
public void testReadOutOfRangeOffset() throws IOException { public void testReadOutOfRangeOffset() {
final long initialOffset = 5L; final long initialOffset = 5L;
final int epoch = 3; final int epoch = 3;
SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
@ -431,7 +428,7 @@ public class MockLogTest {
} }
@Test @Test
public void testCreateSnapshot() throws IOException { public void testCreateSnapshot() {
int numberOfRecords = 10; int numberOfRecords = 10;
int epoch = 0; int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch); OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
@ -588,7 +585,7 @@ public class MockLogTest {
} }
@Test @Test
public void testUpdateLogStartOffset() throws IOException { public void testUpdateLogStartOffset() {
int offset = 10; int offset = 10;
int epoch = 0; int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(offset, epoch); OffsetAndEpoch snapshotId = new OffsetAndEpoch(offset, epoch);
@ -634,7 +631,7 @@ public class MockLogTest {
} }
@Test @Test
public void testFailToIncreaseLogStartPastHighWatermark() throws IOException { public void testFailToIncreaseLogStartPastHighWatermark() {
int offset = 10; int offset = 10;
int epoch = 0; int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(2 * offset, epoch); OffsetAndEpoch snapshotId = new OffsetAndEpoch(2 * offset, epoch);
@ -653,7 +650,7 @@ public class MockLogTest {
} }
@Test @Test
public void testTruncateFullyToLatestSnapshot() throws IOException { public void testTruncateFullyToLatestSnapshot() {
int numberOfRecords = 10; int numberOfRecords = 10;
int epoch = 0; int epoch = 0;
OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch); OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch);
@ -686,7 +683,7 @@ public class MockLogTest {
} }
@Test @Test
public void testDoesntTruncateFully() throws IOException { public void testDoesntTruncateFully() {
int numberOfRecords = 10; int numberOfRecords = 10;
int epoch = 1; int epoch = 1;
@ -710,7 +707,7 @@ public class MockLogTest {
} }
@Test @Test
public void testTruncateWillRemoveOlderSnapshot() throws IOException { public void testTruncateWillRemoveOlderSnapshot() {
int numberOfRecords = 10; int numberOfRecords = 10;
int epoch = 1; int epoch = 1;
@ -734,7 +731,7 @@ public class MockLogTest {
} }
@Test @Test
public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() throws IOException { public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() {
int numberOfRecords = 10; int numberOfRecords = 10;
int epoch = 1; int epoch = 1;
@ -771,7 +768,7 @@ public class MockLogTest {
} }
@Test @Test
public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException { public void testValidateEpochLessThanOldestSnapshotEpoch() {
int offset = 1; int offset = 1;
int epoch = 1; int epoch = 1;
@ -786,7 +783,7 @@ public class MockLogTest {
} }
@Test @Test
public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException { public void testValidateOffsetLessThanOldestSnapshotOffset() {
int offset = 2; int offset = 2;
int epoch = 1; int epoch = 1;
@ -801,7 +798,7 @@ public class MockLogTest {
} }
@Test @Test
public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException { public void testValidateOffsetEqualToOldestSnapshotOffset() {
int offset = 2; int offset = 2;
int epoch = 1; int epoch = 1;
@ -816,7 +813,7 @@ public class MockLogTest {
} }
@Test @Test
public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() throws IOException { public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() {
int numberOfRecords = 5; int numberOfRecords = 5;
int offset = 10; int offset = 10;
@ -836,7 +833,7 @@ public class MockLogTest {
} }
@Test @Test
public void testValidateEpochLessThanFirstEpochInLog() throws IOException { public void testValidateEpochLessThanFirstEpochInLog() {
int numberOfRecords = 5; int numberOfRecords = 5;
int offset = 10; int offset = 10;
@ -980,7 +977,7 @@ public class MockLogTest {
Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records; Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records;
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
assertTrue(batches.size() > 0); assertFalse(batches.isEmpty());
for (RecordBatch batch : batches) { for (RecordBatch batch : batches) {
assertTrue(batch.countOrNull() > 0); assertTrue(batch.countOrNull() > 0);
assertEquals(currentOffset, batch.baseOffset()); assertEquals(currentOffset, batch.baseOffset());

View File

@ -23,7 +23,6 @@ import org.apache.kafka.raft.internals.BatchAccumulator;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
@ -45,8 +44,7 @@ public class QuorumStateTest {
private final int electionTimeoutMs = 5000; private final int electionTimeoutMs = 5000;
private final int fetchTimeoutMs = 10000; private final int fetchTimeoutMs = 10000;
private final MockableRandom random = new MockableRandom(1L); private final MockableRandom random = new MockableRandom(1L);
private final BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class);
private BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class);
private QuorumState buildQuorumState(Set<Integer> voters) { private QuorumState buildQuorumState(Set<Integer> voters) {
return buildQuorumState(OptionalInt.of(localId), voters); return buildQuorumState(OptionalInt.of(localId), voters);
@ -69,7 +67,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializePrimordialEpoch() throws IOException { public void testInitializePrimordialEpoch() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
@ -83,7 +81,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeAsUnattached() throws IOException { public void testInitializeAsUnattached() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
int epoch = 5; int epoch = 5;
@ -104,7 +102,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeAsFollower() throws IOException { public void testInitializeAsFollower() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
int epoch = 5; int epoch = 5;
@ -123,7 +121,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeAsVoted() throws IOException { public void testInitializeAsVoted() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
int epoch = 5; int epoch = 5;
@ -146,7 +144,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeAsResignedCandidate() throws IOException { public void testInitializeAsResignedCandidate() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
int epoch = 5; int epoch = 5;
@ -173,7 +171,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeAsResignedLeader() throws IOException { public void testInitializeAsResignedLeader() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
int epoch = 5; int epoch = 5;
@ -203,7 +201,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToCandidate() throws IOException { public void testCandidateToCandidate() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -246,7 +244,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToResigned() throws IOException { public void testCandidateToResigned() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -263,7 +261,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToLeader() throws IOException { public void testCandidateToLeader() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
@ -273,14 +271,14 @@ public class QuorumStateTest {
assertEquals(1, state.epoch()); assertEquals(1, state.epoch());
state.transitionToLeader(0L, accumulator); state.transitionToLeader(0L, accumulator);
LeaderState leaderState = state.leaderStateOrThrow(); LeaderState<Object> leaderState = state.leaderStateOrThrow();
assertTrue(state.isLeader()); assertTrue(state.isLeader());
assertEquals(1, leaderState.epoch()); assertEquals(1, leaderState.epoch());
assertEquals(Optional.empty(), leaderState.highWatermark()); assertEquals(Optional.empty(), leaderState.highWatermark());
} }
@Test @Test
public void testCandidateToLeaderWithoutGrantedVote() throws IOException { public void testCandidateToLeaderWithoutGrantedVote() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -295,7 +293,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToFollower() throws IOException { public void testCandidateToFollower() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -309,7 +307,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToUnattached() throws IOException { public void testCandidateToUnattached() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -323,7 +321,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToVoted() throws IOException { public void testCandidateToVoted() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -340,7 +338,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCandidateToAnyStateLowerEpoch() throws IOException { public void testCandidateToAnyStateLowerEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -355,7 +353,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToLeader() throws IOException { public void testLeaderToLeader() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
@ -372,7 +370,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToResigned() throws IOException { public void testLeaderToResigned() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
@ -393,7 +391,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToCandidate() throws IOException { public void testLeaderToCandidate() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
@ -410,7 +408,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToFollower() throws IOException { public void testLeaderToFollower() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -427,7 +425,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToUnattached() throws IOException { public void testLeaderToUnattached() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -442,7 +440,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToVoted() throws IOException { public void testLeaderToVoted() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -460,7 +458,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testLeaderToAnyStateLowerEpoch() throws IOException { public void testLeaderToAnyStateLowerEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -477,7 +475,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCannotFollowOrVoteForSelf() throws IOException { public void testCannotFollowOrVoteForSelf() {
Set<Integer> voters = Utils.mkSet(localId); Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState()); assertNull(store.readElectionState());
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -487,7 +485,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToLeaderOrResigned() throws IOException { public void testUnattachedToLeaderOrResigned() {
int leaderId = 1; int leaderId = 1;
int epoch = 5; int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, leaderId); Set<Integer> voters = Utils.mkSet(localId, leaderId);
@ -500,7 +498,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToVotedSameEpoch() throws IOException { public void testUnattachedToVotedSameEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -522,7 +520,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToVotedHigherEpoch() throws IOException { public void testUnattachedToVotedHigherEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -537,7 +535,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToCandidate() throws IOException { public void testUnattachedToCandidate() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -556,7 +554,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToUnattached() throws IOException { public void testUnattachedToUnattached() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -576,7 +574,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToFollowerSameEpoch() throws IOException { public void testUnattachedToFollowerSameEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -592,7 +590,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToFollowerHigherEpoch() throws IOException { public void testUnattachedToFollowerHigherEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -608,7 +606,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testUnattachedToAnyStateLowerEpoch() throws IOException { public void testUnattachedToAnyStateLowerEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -622,7 +620,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToInvalidLeaderOrResigned() throws IOException { public void testVotedToInvalidLeaderOrResigned() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -634,7 +632,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToCandidate() throws IOException { public void testVotedToCandidate() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -653,7 +651,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToVotedSameEpoch() throws IOException { public void testVotedToVotedSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -666,7 +664,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToFollowerSameEpoch() throws IOException { public void testVotedToFollowerSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -682,7 +680,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToFollowerHigherEpoch() throws IOException { public void testVotedToFollowerHigherEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -698,7 +696,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToUnattachedSameEpoch() throws IOException { public void testVotedToUnattachedSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -709,7 +707,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToUnattachedHigherEpoch() throws IOException { public void testVotedToUnattachedHigherEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -729,7 +727,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testVotedToAnyStateLowerEpoch() throws IOException { public void testVotedToAnyStateLowerEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -743,7 +741,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToFollowerSameEpoch() throws IOException { public void testFollowerToFollowerSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -760,7 +758,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToFollowerHigherEpoch() throws IOException { public void testFollowerToFollowerHigherEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -776,7 +774,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToLeaderOrResigned() throws IOException { public void testFollowerToLeaderOrResigned() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -788,7 +786,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToCandidate() throws IOException { public void testFollowerToCandidate() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -807,7 +805,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToUnattachedSameEpoch() throws IOException { public void testFollowerToUnattachedSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -818,7 +816,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToUnattachedHigherEpoch() throws IOException { public void testFollowerToUnattachedHigherEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -837,7 +835,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToVotedSameEpoch() throws IOException { public void testFollowerToVotedSameEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -851,7 +849,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToVotedHigherEpoch() throws IOException { public void testFollowerToVotedHigherEpoch() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2); Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -871,7 +869,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testFollowerToAnyStateLowerEpoch() throws IOException { public void testFollowerToAnyStateLowerEpoch() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -885,7 +883,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testCannotBecomeFollowerOfNonVoter() throws IOException { public void testCannotBecomeFollowerOfNonVoter() {
int otherNodeId = 1; int otherNodeId = 1;
int nonVoterId = 2; int nonVoterId = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -896,7 +894,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testObserverCannotBecomeCandidateOrLeaderOrVoted() throws IOException { public void testObserverCannotBecomeCandidateOrLeaderOrVoted() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(otherNodeId); Set<Integer> voters = Utils.mkSet(otherNodeId);
QuorumState state = initializeEmptyState(voters); QuorumState state = initializeEmptyState(voters);
@ -908,7 +906,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testObserverFollowerToUnattached() throws IOException { public void testObserverFollowerToUnattached() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(node1, node2); Set<Integer> voters = Utils.mkSet(node1, node2);
@ -927,7 +925,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testObserverUnattachedToFollower() throws IOException { public void testObserverUnattachedToFollower() {
int node1 = 1; int node1 = 1;
int node2 = 2; int node2 = 2;
Set<Integer> voters = Utils.mkSet(node1, node2); Set<Integer> voters = Utils.mkSet(node1, node2);
@ -959,7 +957,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInconsistentVotersBetweenConfigAndState() throws IOException { public void testInconsistentVotersBetweenConfigAndState() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -975,7 +973,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testHasRemoteLeader() throws IOException { public void testHasRemoteLeader() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -1000,7 +998,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testHighWatermarkRetained() throws IOException { public void testHighWatermarkRetained() {
int otherNodeId = 1; int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId); Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -1031,7 +1029,7 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeWithEmptyLocalId() throws IOException { public void testInitializeWithEmptyLocalId() {
QuorumState state = buildQuorumState(OptionalInt.empty(), Utils.mkSet(0, 1)); QuorumState state = buildQuorumState(OptionalInt.empty(), Utils.mkSet(0, 1));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
@ -1064,7 +1062,7 @@ public class QuorumStateTest {
assertThrows(IllegalStateException.class, () -> state2.initialize(new OffsetAndEpoch(0, 0))); assertThrows(IllegalStateException.class, () -> state2.initialize(new OffsetAndEpoch(0, 0)));
} }
private QuorumState initializeEmptyState(Set<Integer> voters) throws IOException { private QuorumState initializeEmptyState(Set<Integer> voters) {
QuorumState state = buildQuorumState(voters); QuorumState state = buildQuorumState(voters);
store.writeElectionState(ElectionState.withUnknownLeader(0, voters)); store.writeElectionState(ElectionState.withUnknownLeader(0, voters));
state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.initialize(new OffsetAndEpoch(0L, logEndEpoch));

View File

@ -134,10 +134,10 @@ public final class RaftClientTestContext {
private final MockableRandom random = new MockableRandom(1L); private final MockableRandom random = new MockableRandom(1L);
private final LogContext logContext = new LogContext(); private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext); private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
private final Uuid clusterId = Uuid.randomUuid();
private final Set<Integer> voters; private final Set<Integer> voters;
private final OptionalInt localId; private final OptionalInt localId;
private Uuid clusterId = Uuid.randomUuid();
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS; private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
private int appendLingerMs = DEFAULT_APPEND_LINGER_MS; private int appendLingerMs = DEFAULT_APPEND_LINGER_MS;
@ -152,17 +152,17 @@ public final class RaftClientTestContext {
this.localId = localId; this.localId = localId;
} }
Builder withElectedLeader(int epoch, int leaderId) throws IOException { Builder withElectedLeader(int epoch, int leaderId) {
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, leaderId, voters)); quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, leaderId, voters));
return this; return this;
} }
Builder withUnknownLeader(int epoch) throws IOException { Builder withUnknownLeader(int epoch) {
quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch, voters)); quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch, voters));
return this; return this;
} }
Builder withVotedCandidate(int epoch, int votedId) throws IOException { Builder withVotedCandidate(int epoch, int votedId) {
quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters)); quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters));
return this; return this;
} }
@ -201,14 +201,14 @@ public final class RaftClientTestContext {
return this; return this;
} }
Builder withEmptySnapshot(OffsetAndEpoch snapshotId) throws IOException { Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
try (RawSnapshotWriter snapshot = log.storeSnapshot(snapshotId).get()) { try (RawSnapshotWriter snapshot = log.storeSnapshot(snapshotId).get()) {
snapshot.freeze(); snapshot.freeze();
} }
return this; return this;
} }
Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) throws IOException { Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
if (snapshotId.offset() > log.highWatermark().offset) { if (snapshotId.offset() > log.highWatermark().offset) {
log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset())); log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset()));
} }
@ -227,11 +227,6 @@ public final class RaftClientTestContext {
return this; return this;
} }
Builder withClusterId(Uuid clusterId) {
this.clusterId = clusterId;
return this;
}
public RaftClientTestContext build() throws IOException { public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time); Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters); MockNetworkChannel channel = new MockNetworkChannel(voters);
@ -432,19 +427,19 @@ public final class RaftClientTestContext {
pollUntil(channel::hasSentRequests); pollUntil(channel::hasSentRequests);
} }
void assertVotedCandidate(int epoch, int leaderId) throws IOException { void assertVotedCandidate(int epoch, int leaderId) {
assertEquals(ElectionState.withVotedCandidate(epoch, leaderId, voters), quorumStateStore.readElectionState()); assertEquals(ElectionState.withVotedCandidate(epoch, leaderId, voters), quorumStateStore.readElectionState());
} }
public void assertElectedLeader(int epoch, int leaderId) throws IOException { public void assertElectedLeader(int epoch, int leaderId) {
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState()); assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
} }
void assertUnknownLeader(int epoch) throws IOException { void assertUnknownLeader(int epoch) {
assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState()); assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());
} }
void assertResignedLeader(int epoch, int leaderId) throws IOException { void assertResignedLeader(int epoch, int leaderId) {
assertTrue(client.quorum().isResigned()); assertTrue(client.quorum().isResigned());
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState()); assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
} }
@ -528,7 +523,7 @@ public final class RaftClientTestContext {
long lastEpochOffset long lastEpochOffset
) { ) {
List<RaftRequest.Outbound> voteRequests = new ArrayList<>(); List<RaftRequest.Outbound> voteRequests = new ArrayList<>();
for (RaftMessage raftMessage : channel.drainSendQueue()) { for (RaftRequest.Outbound raftMessage : channel.drainSendQueue()) {
if (raftMessage.data() instanceof VoteRequestData) { if (raftMessage.data() instanceof VoteRequestData) {
VoteRequestData request = (VoteRequestData) raftMessage.data(); VoteRequestData request = (VoteRequestData) raftMessage.data();
VoteRequestData.PartitionData partitionRequest = unwrap(request); VoteRequestData.PartitionData partitionRequest = unwrap(request);
@ -537,7 +532,7 @@ public final class RaftClientTestContext {
assertEquals(localIdOrThrow(), partitionRequest.candidateId()); assertEquals(localIdOrThrow(), partitionRequest.candidateId());
assertEquals(lastEpoch, partitionRequest.lastOffsetEpoch()); assertEquals(lastEpoch, partitionRequest.lastOffsetEpoch());
assertEquals(lastEpochOffset, partitionRequest.lastOffset()); assertEquals(lastEpochOffset, partitionRequest.lastOffset());
voteRequests.add((RaftRequest.Outbound) raftMessage); voteRequests.add(raftMessage);
} }
} }
return voteRequests; return voteRequests;
@ -764,7 +759,7 @@ public final class RaftClientTestContext {
) { ) {
List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>(); List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>();
Set<Integer> collectedDestinationIdSet = new HashSet<>(); Set<Integer> collectedDestinationIdSet = new HashSet<>();
for (RaftMessage raftMessage : channel.drainSendQueue()) { for (RaftRequest.Outbound raftMessage : channel.drainSendQueue()) {
if (raftMessage.data() instanceof EndQuorumEpochRequestData) { if (raftMessage.data() instanceof EndQuorumEpochRequestData) {
EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) raftMessage.data(); EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) raftMessage.data();
@ -777,9 +772,8 @@ public final class RaftClientTestContext {
assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors()); assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors());
}); });
RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage; collectedDestinationIdSet.add(raftMessage.destinationId());
collectedDestinationIdSet.add(outboundRequest.destinationId()); endQuorumRequests.add(raftMessage);
endQuorumRequests.add(outboundRequest);
} }
} }
assertEquals(destinationIdSet, collectedDestinationIdSet); assertEquals(destinationIdSet, collectedDestinationIdSet);

View File

@ -494,7 +494,7 @@ class BatchAccumulatorTest {
} }
int recordSizeInBytes(String record, int numberOfRecords) { int recordSizeInBytes(String record, int numberOfRecords) {
int serdeSize = serde.recordSize("a", new ObjectSerializationCache()); int serdeSize = serde.recordSize(record, new ObjectSerializationCache());
int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes( int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
numberOfRecords, numberOfRecords,
@ -507,23 +507,15 @@ class BatchAccumulatorTest {
return ByteUtils.sizeOfVarint(recordSizeInBytes) + recordSizeInBytes; return ByteUtils.sizeOfVarint(recordSizeInBytes) + recordSizeInBytes;
} }
static interface Appender { interface Appender {
Long call(BatchAccumulator<String> acc, int epoch, List<String> records); Long call(BatchAccumulator<String> acc, int epoch, List<String> records);
} }
static final Appender APPEND_ATOMIC = new Appender() { static final Appender APPEND_ATOMIC = (acc, epoch, records) ->
@Override acc.append(epoch, records, OptionalLong.empty(), true);
public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
return acc.append(epoch, records, OptionalLong.empty(), true);
}
};
static final Appender APPEND = new Appender() { static final Appender APPEND = (acc, epoch, records) ->
@Override acc.append(epoch, records, OptionalLong.empty(), false);
public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
return acc.append(epoch, records, OptionalLong.empty(), false);
}
};
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {false, true}) @ValueSource(booleans = {false, true})

View File

@ -35,8 +35,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
class BatchBuilderTest { class BatchBuilderTest {
private StringSerde serde = new StringSerde(); private final StringSerde serde = new StringSerde();
private MockTime time = new MockTime(); private final MockTime time = new MockTime();
@ParameterizedTest @ParameterizedTest
@EnumSource(CompressionType.class) @EnumSource(CompressionType.class)

View File

@ -31,7 +31,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.OptionalLong; import java.util.OptionalLong;
@ -51,7 +50,7 @@ public class KafkaRaftMetricsTest {
private final Random random = new Random(1); private final Random random = new Random(1);
private KafkaRaftMetrics raftMetrics; private KafkaRaftMetrics raftMetrics;
private BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class); private final BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class);
@AfterEach @AfterEach
public void tearDown() { public void tearDown() {
@ -75,7 +74,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordVoterQuorumState() throws IOException { public void shouldRecordVoterQuorumState() {
QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2)); QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
@ -132,7 +131,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordNonVoterQuorumState() throws IOException { public void shouldRecordNonVoterQuorumState() {
QuorumState state = buildQuorumState(Utils.mkSet(1, 2, 3)); QuorumState state = buildQuorumState(Utils.mkSet(1, 2, 3));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@ -162,7 +161,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordLogEnd() throws IOException { public void shouldRecordLogEnd() {
QuorumState state = buildQuorumState(Collections.singleton(localId)); QuorumState state = buildQuorumState(Collections.singleton(localId));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@ -177,7 +176,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordNumUnknownVoterConnections() throws IOException { public void shouldRecordNumUnknownVoterConnections() {
QuorumState state = buildQuorumState(Collections.singleton(localId)); QuorumState state = buildQuorumState(Collections.singleton(localId));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@ -262,7 +261,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordLatency() throws IOException { public void shouldRecordLatency() {
QuorumState state = buildQuorumState(Collections.singleton(localId)); QuorumState state = buildQuorumState(Collections.singleton(localId));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@ -293,7 +292,7 @@ public class KafkaRaftMetricsTest {
} }
@Test @Test
public void shouldRecordRate() throws IOException { public void shouldRecordRate() {
QuorumState state = buildQuorumState(Collections.singleton(localId)); QuorumState state = buildQuorumState(Collections.singleton(localId));
state.initialize(new OffsetAndEpoch(0L, 0)); state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);

View File

@ -85,7 +85,7 @@ public final class FileRawSnapshotTest {
} }
@Test @Test
public void testWriteReadSnapshot() throws IOException { public void testWriteReadSnapshot() {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3); OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256; int bufferSize = 256;
int numberOfBatches = 10; int numberOfBatches = 10;
@ -129,7 +129,7 @@ public final class FileRawSnapshotTest {
} }
@Test @Test
public void testPartialWriteReadSnapshot() throws IOException { public void testPartialWriteReadSnapshot() {
Path tempDir = TestUtils.tempDirectory().toPath(); Path tempDir = TestUtils.tempDirectory().toPath();
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3); OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
@ -169,7 +169,7 @@ public final class FileRawSnapshotTest {
} }
@Test @Test
public void testBatchWriteReadSnapshot() throws IOException { public void testBatchWriteReadSnapshot() {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3); OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256; int bufferSize = 256;
int batchSize = 3; int batchSize = 3;