minor stylish fixes to raft client (#10809)

Style fixes to KafkaRaftClient

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Boyang Chen 2021-06-03 18:51:03 -07:00 committed by GitHub
parent e97cff2702
commit 0358c21ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 31 deletions

View File

@ -309,17 +309,15 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> {
return new IllegalStateException(
String.format(
"Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
listenerContext.listener.getClass().getTypeName(),
nextExpectedOffset,
log.startOffset(),
highWatermark
)
);
});
SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
String.format(
"Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
listenerContext.listener.getClass().getTypeName(),
nextExpectedOffset,
log.startOffset(),
highWatermark
)
));
listenerContext.fireHandleSnapshot(snapshot);
}
});
@ -347,14 +345,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> records) {
for (ListenerContext listenerContext : listenerContexts) {
OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset();
if (!nextExpectedOffsetOpt.isPresent()) {
continue;
}
long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
if (nextExpectedOffset == baseOffset) {
listenerContext.fireHandleCommit(baseOffset, epoch, records);
}
nextExpectedOffsetOpt.ifPresent(nextOffset -> {
if (nextOffset == baseOffset)
listenerContext.fireHandleCommit(baseOffset, epoch, records);
});
}
}
@ -388,7 +382,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) {
transitionToCandidate(currentTimeMs);
}
} catch (IOException e) {
@ -449,7 +442,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
// We update the end offset before flushing so that parked fetches can return sooner
// We update the end offset before flushing so that parked fetches can return sooner.
updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
log.flush();
}
@ -502,11 +495,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
resetConnections();
// After becoming a follower, we need to complete all pending fetches so that
// they can be resent to the leader without waiting for their expiration
// they can be re-sent to the leader without waiting for their expirations
fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
"Cannot process the fetch request because the node is no longer the leader."));
// Clearing the append purgatory should complete all future exceptionally since this node is no longer the leader
// Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader
appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
"Failed to receive sufficient acknowledgments for this append before leader change."));
}
@ -552,7 +545,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
// Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
}
@ -638,7 +631,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
binaryExponentialElectionBackoffMs(state.retries())
);
}
}
} else {
logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}",
@ -1072,7 +1064,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
FetchResponseData.EpochEndOffset divergingEpoch = partitionResponse.divergingEpoch();
if (divergingEpoch.epoch() >= 0) {
// The leader is asking us to truncate before continuing
OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
final OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
divergingEpoch.endOffset(), divergingEpoch.epoch());
state.highWatermark().ifPresent(highWatermark -> {
@ -1104,7 +1096,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
return false;
} else {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(
final OffsetAndEpoch snapshotId = new OffsetAndEpoch(
partitionResponse.snapshotId().endOffset(),
partitionResponse.snapshotId().epoch()
);
@ -1193,7 +1185,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
) throws IOException {
) {
FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data;
if (!hasValidClusterId(data.clusterId())) {
@ -2038,7 +2030,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) throws IOException {
private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) {
final Supplier<ApiMessage> requestSupplier;
if (state.fetchingSnapshot().isPresent()) {
@ -2465,7 +2457,5 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
wakeup();
}
}
}
}