KAFKA-12773; Use UncheckedIOException when wrapping IOException (#10749)

The raft module may not be fully consistent on this but in general in that module we have decided to not throw the checked IOException. We have been avoiding checked IOException exceptions by wrapping them in RuntimeException. The raft module should instead wrap IOException in UncheckedIOException. 

Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
loboya~ 2021-06-16 01:22:48 +08:00 committed by GitHub
parent c16711cb8e
commit 4b7ad7b14d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 159 additions and 124 deletions

View File

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.UncheckedIOException;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -67,7 +68,7 @@ public class FileBasedStateStore implements QuorumStateStore {
this.stateFile = stateFile; this.stateFile = stateFile;
} }
private QuorumStateData readStateFromFile(File file) throws IOException { private QuorumStateData readStateFromFile(File file) {
try (final BufferedReader reader = Files.newBufferedReader(file.toPath())) { try (final BufferedReader reader = Files.newBufferedReader(file.toPath())) {
final String line = reader.readLine(); final String line = reader.readLine();
if (line == null) { if (line == null) {
@ -91,6 +92,9 @@ public class FileBasedStateStore implements QuorumStateStore {
final short dataVersion = dataVersionNode.shortValue(); final short dataVersion = dataVersionNode.shortValue();
return QuorumStateDataJsonConverter.read(dataObject, dataVersion); return QuorumStateDataJsonConverter.read(dataObject, dataVersion);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error while reading the Quorum status from the file %s", file), e);
} }
} }
@ -98,7 +102,7 @@ public class FileBasedStateStore implements QuorumStateStore {
* Reads the election state from local file. * Reads the election state from local file.
*/ */
@Override @Override
public ElectionState readElectionState() throws IOException { public ElectionState readElectionState() {
if (!stateFile.exists()) { if (!stateFile.exists()) {
return null; return null;
} }
@ -115,7 +119,7 @@ public class FileBasedStateStore implements QuorumStateStore {
} }
@Override @Override
public void writeElectionState(ElectionState latest) throws IOException { public void writeElectionState(ElectionState latest) {
QuorumStateData data = new QuorumStateData() QuorumStateData data = new QuorumStateData()
.setLeaderEpoch(latest.epoch) .setLeaderEpoch(latest.epoch)
.setVotedId(latest.hasVoted() ? latest.votedId() : NOT_VOTED) .setVotedId(latest.hasVoted() ? latest.votedId() : NOT_VOTED)
@ -129,9 +133,9 @@ public class FileBasedStateStore implements QuorumStateStore {
voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toList()); voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toList());
} }
private void writeElectionStateToFile(final File stateFile, QuorumStateData state) throws IOException { private void writeElectionStateToFile(final File stateFile, QuorumStateData state) {
final File temp = new File(stateFile.getAbsolutePath() + ".tmp"); final File temp = new File(stateFile.getAbsolutePath() + ".tmp");
Files.deleteIfExists(temp.toPath()); deleteFileIfExists(temp);
log.trace("Writing tmp quorum state {}", temp.getAbsolutePath()); log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
@ -146,25 +150,36 @@ public class FileBasedStateStore implements QuorumStateStore {
writer.flush(); writer.flush();
fileOutputStream.getFD().sync(); fileOutputStream.getFD().sync();
Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath()); Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error while writing the Quorum status from the file %s",
stateFile.getAbsolutePath()), e);
} finally { } finally {
// cleanup the temp file when the write finishes (either success or fail). // cleanup the temp file when the write finishes (either success or fail).
Files.deleteIfExists(temp.toPath()); deleteFileIfExists(temp);
} }
} }
/** /**
* Clear state store by deleting the local quorum state file * Clear state store by deleting the local quorum state file
*
* @throws IOException if there is any IO exception during delete
*/ */
@Override @Override
public void clear() throws IOException { public void clear() {
Files.deleteIfExists(stateFile.toPath()); deleteFileIfExists(stateFile);
Files.deleteIfExists(new File(stateFile.getAbsolutePath() + ".tmp").toPath()); deleteFileIfExists(new File(stateFile.getAbsolutePath() + ".tmp"));
} }
@Override @Override
public String toString() { public String toString() {
return "Quorum state filepath: " + stateFile.getAbsolutePath(); return "Quorum state filepath: " + stateFile.getAbsolutePath();
} }
private void deleteFileIfExists(File file) {
try {
Files.deleteIfExists(file.toPath());
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error while deleting file %s", file.getAbsoluteFile()), e);
}
}
} }

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.OptionalLong; import java.util.OptionalLong;
@ -138,7 +137,7 @@ public class FollowerState implements EpochState {
return fetchingSnapshot; return fetchingSnapshot;
} }
public void setFetchingSnapshot(Optional<RawSnapshotWriter> fetchingSnapshot) throws IOException { public void setFetchingSnapshot(Optional<RawSnapshotWriter> fetchingSnapshot) {
if (fetchingSnapshot.isPresent()) { if (fetchingSnapshot.isPresent()) {
fetchingSnapshot.get().close(); fetchingSnapshot.get().close();
} }
@ -165,7 +164,7 @@ public class FollowerState implements EpochState {
} }
@Override @Override
public void close() throws IOException { public void close() {
if (fetchingSnapshot.isPresent()) { if (fetchingSnapshot.isPresent()) {
fetchingSnapshot.get().close(); fetchingSnapshot.get().close();
} }

View File

@ -75,7 +75,6 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
@ -366,7 +365,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
@Override @Override
public void initialize() { public void initialize() {
try {
quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));
long currentTimeMs = time.milliseconds(); long currentTimeMs = time.milliseconds();
@ -382,11 +380,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
if (quorum.isVoter() if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty() && quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) { && !quorum.isCandidate()) {
transitionToCandidate(currentTimeMs); transitionToCandidate(currentTimeMs);
} }
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
@Override @Override
@ -413,7 +409,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
requestManager.resetAll(); requestManager.resetAll();
} }
private void onBecomeLeader(long currentTimeMs) throws IOException { private void onBecomeLeader(long currentTimeMs) {
long endOffset = log.endOffset().offset; long endOffset = log.endOffset().offset;
BatchAccumulator<T> accumulator = new BatchAccumulator<>( BatchAccumulator<T> accumulator = new BatchAccumulator<>(
@ -447,7 +443,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
log.flush(); log.flush();
} }
private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs) throws IOException { private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs) {
if (state.isVoteGranted()) { if (state.isVoteGranted()) {
onBecomeLeader(currentTimeMs); onBecomeLeader(currentTimeMs);
return true; return true;
@ -456,7 +452,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private void onBecomeCandidate(long currentTimeMs) throws IOException { private void onBecomeCandidate(long currentTimeMs) {
CandidateState state = quorum.candidateStateOrThrow(); CandidateState state = quorum.candidateStateOrThrow();
if (!maybeTransitionToLeader(state, currentTimeMs)) { if (!maybeTransitionToLeader(state, currentTimeMs)) {
resetConnections(); resetConnections();
@ -464,13 +460,13 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private void transitionToCandidate(long currentTimeMs) throws IOException { private void transitionToCandidate(long currentTimeMs) {
quorum.transitionToCandidate(); quorum.transitionToCandidate();
maybeFireLeaderChange(); maybeFireLeaderChange();
onBecomeCandidate(currentTimeMs); onBecomeCandidate(currentTimeMs);
} }
private void transitionToUnattached(int epoch) throws IOException { private void transitionToUnattached(int epoch) {
quorum.transitionToUnattached(epoch); quorum.transitionToUnattached(epoch);
maybeFireLeaderChange(); maybeFireLeaderChange();
resetConnections(); resetConnections();
@ -483,7 +479,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
resetConnections(); resetConnections();
} }
private void transitionToVoted(int candidateId, int epoch) throws IOException { private void transitionToVoted(int candidateId, int epoch) {
quorum.transitionToVoted(epoch, candidateId); quorum.transitionToVoted(epoch, candidateId);
maybeFireLeaderChange(); maybeFireLeaderChange();
resetConnections(); resetConnections();
@ -508,7 +504,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
int epoch, int epoch,
int leaderId, int leaderId,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
quorum.transitionToFollower(epoch, leaderId); quorum.transitionToFollower(epoch, leaderId);
maybeFireLeaderChange(); maybeFireLeaderChange();
onBecomeFollower(currentTimeMs); onBecomeFollower(currentTimeMs);
@ -537,7 +533,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
*/ */
private VoteResponseData handleVoteRequest( private VoteResponseData handleVoteRequest(
RaftRequest.Inbound requestMetadata RaftRequest.Inbound requestMetadata
) throws IOException { ) {
VoteRequestData request = (VoteRequestData) requestMetadata.data; VoteRequestData request = (VoteRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) { if (!hasValidClusterId(request.clusterId())) {
@ -584,7 +580,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean handleVoteResponse( private boolean handleVoteResponse(
RaftResponse.Inbound responseMetadata, RaftResponse.Inbound responseMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
int remoteNodeId = responseMetadata.sourceId(); int remoteNodeId = responseMetadata.sourceId();
VoteResponseData response = (VoteResponseData) responseMetadata.data; VoteResponseData response = (VoteResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode()); Errors topLevelError = Errors.forCode(response.errorCode());
@ -683,7 +679,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest( private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
RaftRequest.Inbound requestMetadata, RaftRequest.Inbound requestMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) requestMetadata.data; BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) { if (!hasValidClusterId(request.clusterId())) {
@ -713,7 +709,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean handleBeginQuorumEpochResponse( private boolean handleBeginQuorumEpochResponse(
RaftResponse.Inbound responseMetadata, RaftResponse.Inbound responseMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
int remoteNodeId = responseMetadata.sourceId(); int remoteNodeId = responseMetadata.sourceId();
BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData) responseMetadata.data; BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode()); Errors topLevelError = Errors.forCode(response.errorCode());
@ -772,7 +768,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private EndQuorumEpochResponseData handleEndQuorumEpochRequest( private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
RaftRequest.Inbound requestMetadata, RaftRequest.Inbound requestMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data; EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) { if (!hasValidClusterId(request.clusterId())) {
@ -826,7 +822,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean handleEndQuorumEpochResponse( private boolean handleEndQuorumEpochResponse(
RaftResponse.Inbound responseMetadata, RaftResponse.Inbound responseMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
EndQuorumEpochResponseData response = (EndQuorumEpochResponseData) responseMetadata.data; EndQuorumEpochResponseData response = (EndQuorumEpochResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode()); Errors topLevelError = Errors.forCode(response.errorCode());
if (topLevelError != Errors.NONE) { if (topLevelError != Errors.NONE) {
@ -1034,7 +1030,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean handleFetchResponse( private boolean handleFetchResponse(
RaftResponse.Inbound responseMetadata, RaftResponse.Inbound responseMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
FetchResponseData response = (FetchResponseData) responseMetadata.data; FetchResponseData response = (FetchResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(response.errorCode()); Errors topLevelError = Errors.forCode(response.errorCode());
if (topLevelError != Errors.NONE) { if (topLevelError != Errors.NONE) {
@ -1286,7 +1282,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean handleFetchSnapshotResponse( private boolean handleFetchSnapshotResponse(
RaftResponse.Inbound responseMetadata, RaftResponse.Inbound responseMetadata,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
FetchSnapshotResponseData data = (FetchSnapshotResponseData) responseMetadata.data; FetchSnapshotResponseData data = (FetchSnapshotResponseData) responseMetadata.data;
Errors topLevelError = Errors.forCode(data.errorCode()); Errors topLevelError = Errors.forCode(data.errorCode());
if (topLevelError != Errors.NONE) { if (topLevelError != Errors.NONE) {
@ -1446,7 +1442,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
OptionalInt leaderId, OptionalInt leaderId,
int epoch, int epoch,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
if (epoch < quorum.epoch() || error == Errors.UNKNOWN_LEADER_EPOCH) { if (epoch < quorum.epoch() || error == Errors.UNKNOWN_LEADER_EPOCH) {
// We have a larger epoch, so the response is no longer relevant // We have a larger epoch, so the response is no longer relevant
return Optional.of(true); return Optional.of(true);
@ -1492,7 +1488,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
OptionalInt leaderId, OptionalInt leaderId,
int epoch, int epoch,
long currentTimeMs long currentTimeMs
) throws IOException { ) {
if (!hasConsistentLeader(epoch, leaderId)) { if (!hasConsistentLeader(epoch, leaderId)) {
throw new IllegalStateException("Received request or response with leader " + leaderId + throw new IllegalStateException("Received request or response with leader " + leaderId +
" and epoch " + epoch + " which is inconsistent with current leader " + " and epoch " + epoch + " which is inconsistent with current leader " +
@ -1526,7 +1522,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return false; return false;
} }
private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) throws IOException { private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
// The response epoch matches the local epoch, so we can handle the response // The response epoch matches the local epoch, so we can handle the response
ApiKeys apiKey = ApiKeys.forId(response.data.apiKey()); ApiKeys apiKey = ApiKeys.forId(response.data.apiKey());
final boolean handledSuccessfully; final boolean handledSuccessfully;
@ -1602,7 +1598,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) throws IOException { private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
ApiKeys apiKey = ApiKeys.forId(request.data.apiKey()); ApiKeys apiKey = ApiKeys.forId(request.data.apiKey());
final CompletableFuture<? extends ApiMessage> responseFuture; final CompletableFuture<? extends ApiMessage> responseFuture;
@ -1649,7 +1645,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}); });
} }
private void handleInboundMessage(RaftMessage message, long currentTimeMs) throws IOException { private void handleInboundMessage(RaftMessage message, long currentTimeMs) {
logger.trace("Received inbound message {}", message); logger.trace("Received inbound message {}", message);
if (message instanceof RaftRequest.Inbound) { if (message instanceof RaftRequest.Inbound) {
@ -1887,7 +1883,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return timeUntilDrain; return timeUntilDrain;
} }
private long pollResigned(long currentTimeMs) throws IOException { private long pollResigned(long currentTimeMs) {
ResignedState state = quorum.resignedStateOrThrow(); ResignedState state = quorum.resignedStateOrThrow();
long endQuorumBackoffMs = maybeSendRequests( long endQuorumBackoffMs = maybeSendRequests(
currentTimeMs, currentTimeMs,
@ -1950,7 +1946,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }
private long pollCandidate(long currentTimeMs) throws IOException { private long pollCandidate(long currentTimeMs) {
CandidateState state = quorum.candidateStateOrThrow(); CandidateState state = quorum.candidateStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get(); GracefulShutdown shutdown = this.shutdown.get();
@ -1981,7 +1977,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private long pollFollower(long currentTimeMs) throws IOException { private long pollFollower(long currentTimeMs) {
FollowerState state = quorum.followerStateOrThrow(); FollowerState state = quorum.followerStateOrThrow();
if (quorum.isVoter()) { if (quorum.isVoter()) {
return pollFollowerAsVoter(state, currentTimeMs); return pollFollowerAsVoter(state, currentTimeMs);
@ -1990,7 +1986,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException { private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get(); GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) { if (shutdown != null) {
// If we are a follower, then we can shutdown immediately. We want to // If we are a follower, then we can shutdown immediately. We want to
@ -2007,7 +2003,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) throws IOException { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) { if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendAnyVoterFetch(currentTimeMs); return maybeSendAnyVoterFetch(currentTimeMs);
} else { } else {
@ -2045,7 +2041,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return maybeSendRequest(currentTimeMs, state.leaderId(), requestSupplier); return maybeSendRequest(currentTimeMs, state.leaderId(), requestSupplier);
} }
private long pollVoted(long currentTimeMs) throws IOException { private long pollVoted(long currentTimeMs) {
VotedState state = quorum.votedStateOrThrow(); VotedState state = quorum.votedStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get(); GracefulShutdown shutdown = this.shutdown.get();
@ -2061,7 +2057,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private long pollUnattached(long currentTimeMs) throws IOException { private long pollUnattached(long currentTimeMs) {
UnattachedState state = quorum.unattachedStateOrThrow(); UnattachedState state = quorum.unattachedStateOrThrow();
if (quorum.isVoter()) { if (quorum.isVoter()) {
return pollUnattachedAsVoter(state, currentTimeMs); return pollUnattachedAsVoter(state, currentTimeMs);
@ -2070,7 +2066,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) throws IOException { private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get(); GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) { if (shutdown != null) {
// If shutting down, then remain in this state until either the // If shutting down, then remain in this state until either the
@ -2089,7 +2085,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
} }
private long pollCurrentState(long currentTimeMs) throws IOException { private long pollCurrentState(long currentTimeMs) {
maybeDeleteBeforeSnapshot(); maybeDeleteBeforeSnapshot();
if (quorum.isLeader()) { if (quorum.isLeader()) {
@ -2181,10 +2177,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
/** /**
* Poll for new events. This allows the client to handle inbound * Poll for new events. This allows the client to handle inbound
* requests and send any needed outbound requests. * requests and send any needed outbound requests.
*
* @throws IOException for any IO errors encountered
*/ */
public void poll() throws IOException { public void poll() {
pollListeners(); pollListeners();
long currentTimeMs = time.milliseconds(); long currentTimeMs = time.milliseconds();

View File

@ -22,6 +22,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -105,7 +106,7 @@ public class QuorumState {
this.logContext = logContext; this.logContext = logContext;
} }
public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, IllegalStateException { public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException {
// We initialize in whatever state we were in on shutdown. If we were a leader // We initialize in whatever state we were in on shutdown. If we were a leader
// or candidate, probably an election was held, but we will find out about it // or candidate, probably an election was held, but we will find out about it
// when we send Vote or BeginEpoch requests. // when we send Vote or BeginEpoch requests.
@ -116,7 +117,7 @@ public class QuorumState {
if (election == null) { if (election == null) {
election = ElectionState.withUnknownLeader(0, voters); election = ElectionState.withUnknownLeader(0, voters);
} }
} catch (final IOException e) { } catch (final UncheckedIOException e) {
// For exceptions during state file loading (missing or not readable), // For exceptions during state file loading (missing or not readable),
// we could assume the file is corrupted already and should be cleaned up. // we could assume the file is corrupted already and should be cleaned up.
log.warn("Clearing local quorum state store after error loading state {}", log.warn("Clearing local quorum state store after error loading state {}",
@ -292,7 +293,7 @@ public class QuorumState {
* Transition to the "unattached" state. This means we have found an epoch greater than * Transition to the "unattached" state. This means we have found an epoch greater than
* or equal to the current epoch, but wo do not yet know of the elected leader. * or equal to the current epoch, but wo do not yet know of the elected leader.
*/ */
public void transitionToUnattached(int epoch) throws IOException { public void transitionToUnattached(int epoch) {
int currentEpoch = state.epoch(); int currentEpoch = state.epoch();
if (epoch <= currentEpoch) { if (epoch <= currentEpoch) {
throw new IllegalStateException("Cannot transition to Unattached with epoch= " + epoch + throw new IllegalStateException("Cannot transition to Unattached with epoch= " + epoch +
@ -331,7 +332,7 @@ public class QuorumState {
public void transitionToVoted( public void transitionToVoted(
int epoch, int epoch,
int candidateId int candidateId
) throws IOException { ) {
if (localId.isPresent() && candidateId == localId.getAsInt()) { if (localId.isPresent() && candidateId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId +
" and epoch=" + epoch + " since it matches the local broker.id"); " and epoch=" + epoch + " since it matches the local broker.id");
@ -372,7 +373,7 @@ public class QuorumState {
public void transitionToFollower( public void transitionToFollower(
int epoch, int epoch,
int leaderId int leaderId
) throws IOException { ) {
if (localId.isPresent() && leaderId == localId.getAsInt()) { if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId + throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId +
" and epoch=" + epoch + " since it matches the local broker.id=" + localId); " and epoch=" + epoch + " since it matches the local broker.id=" + localId);
@ -402,7 +403,7 @@ public class QuorumState {
)); ));
} }
public void transitionToCandidate() throws IOException { public void transitionToCandidate() {
if (isObserver()) { if (isObserver()) {
throw new IllegalStateException("Cannot transition to Candidate since the local broker.id=" + localId + throw new IllegalStateException("Cannot transition to Candidate since the local broker.id=" + localId +
" is not one of the voters " + voters); " is not one of the voters " + voters);
@ -427,7 +428,7 @@ public class QuorumState {
)); ));
} }
public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumulator<T> accumulator) throws IOException { public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumulator<T> accumulator) {
if (isObserver()) { if (isObserver()) {
throw new IllegalStateException("Cannot transition to Leader since the local broker.id=" + localId + throw new IllegalStateException("Cannot transition to Leader since the local broker.id=" + localId +
" is not one of the voters " + voters); " is not one of the voters " + voters);
@ -463,9 +464,14 @@ public class QuorumState {
return state; return state;
} }
private void transitionTo(EpochState state) throws IOException { private void transitionTo(EpochState state) {
if (this.state != null) { if (this.state != null) {
try {
this.state.close(); this.state.close();
} catch (IOException e) {
throw new UncheckedIOException(
"Failed to transition from " + this.state.name() + " to " + state.name(), e);
}
} }
this.store.writeElectionState(state.election()); this.store.writeElectionState(state.election());

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.kafka.raft; package org.apache.kafka.raft;
import java.io.IOException;
/** /**
* Maintain the save and retrieval of quorum state information, so far only supports * Maintain the save and retrieval of quorum state information, so far only supports
* read and write of election states. * read and write of election states.
@ -31,22 +29,18 @@ public interface QuorumStateStore {
* Read the latest election state. * Read the latest election state.
* *
* @return The latest written election state or `null` if there is none * @return The latest written election state or `null` if there is none
* @throws IOException For any error encountered reading from the storage
*/ */
ElectionState readElectionState() throws IOException; ElectionState readElectionState();
/** /**
* Persist the updated election state. This must be atomic, both writing the full updated state * Persist the updated election state. This must be atomic, both writing the full updated state
* and replacing the old state. * and replacing the old state.
* @param latest The latest election state * @param latest The latest election state
* @throws IOException For any error encountered while writing the updated state
*/ */
void writeElectionState(ElectionState latest) throws IOException; void writeElectionState(ElectionState latest);
/** /**
* Clear any state associated to the store for a fresh start * Clear any state associated to the store for a fresh start
*
* @throws IOException For any error encountered while cleaning up the state
*/ */
void clear() throws IOException; void clear();
} }

View File

@ -17,6 +17,7 @@
package org.apache.kafka.raft.internals; package org.apache.kafka.raft.internals;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -102,7 +103,7 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
try { try {
fileRecords.readInto(buffer, bytesRead); fileRecords.readInto(buffer, bytesRead);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Failed to read records into memory", e); throw new UncheckedIOException("Failed to read records into memory", e);
} }
bytesRead += buffer.limit() - start; bytesRead += buffer.limit() - start;

View File

@ -60,11 +60,7 @@ public final class FileRawSnapshotReader implements RawSnapshotReader, AutoClose
fileRecords.close(); fileRecords.close();
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException( throw new UncheckedIOException(
String.format( String.format("Unable to close snapshot reader %s at %s", snapshotId, fileRecords),
"Unable to close snapshot reader %s at %s",
snapshotId,
fileRecords
),
e e
); );
} }
@ -75,17 +71,23 @@ public final class FileRawSnapshotReader implements RawSnapshotReader, AutoClose
* *
* @param logDir the directory for the topic partition * @param logDir the directory for the topic partition
* @param snapshotId the end offset and epoch for the snapshotId * @param snapshotId the end offset and epoch for the snapshotId
* @throws java.nio.file.NoSuchFileException if the snapshot doesn't exist
* @throws IOException for any IO error while opening the snapshot
*/ */
public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) throws IOException { public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) {
FileRecords fileRecords = FileRecords.open( FileRecords fileRecords;
Snapshots.snapshotPath(logDir, snapshotId).toFile(), Path filePath = Snapshots.snapshotPath(logDir, snapshotId);
try {
fileRecords = FileRecords.open(
filePath.toFile(),
false, // mutable false, // mutable
true, // fileAlreadyExists true, // fileAlreadyExists
0, // initFileSize 0, // initFileSize
false // preallocate false // preallocate
); );
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Unable to Opens a snapshot file %s", filePath.toAbsolutePath()), e
);
}
return new FileRawSnapshotReader(fileRecords, snapshotId); return new FileRawSnapshotReader(fileRecords, snapshotId);
} }

View File

@ -23,6 +23,7 @@ import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.ReplicatedLog; import org.apache.kafka.raft.ReplicatedLog;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -58,7 +59,9 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
try { try {
return channel.size(); return channel.size();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error calculating snapshot size.temp path = %s, snapshotId = %s.",
this.tempSnapshotPath, this.snapshotId), e);
} }
} }
@ -68,7 +71,11 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
checkIfFrozen("Append"); checkIfFrozen("Append");
Utils.writeFully(channel, records.buffer()); Utils.writeFully(channel, records.buffer());
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error writing file snapshot, " +
"temp path = %s, snapshotId = %s.", this.tempSnapshotPath, this.snapshotId),
e
);
} }
} }
@ -78,7 +85,11 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
checkIfFrozen("Append"); checkIfFrozen("Append");
Utils.writeFully(channel, records.buffer()); Utils.writeFully(channel, records.buffer());
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error writing file snapshot, " +
"temp path = %s, snapshotId = %s.", this.tempSnapshotPath, this.snapshotId),
e
);
} }
} }
@ -104,7 +115,11 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
replicatedLog.ifPresent(log -> log.onSnapshotFrozen(snapshotId)); replicatedLog.ifPresent(log -> log.onSnapshotFrozen(snapshotId));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error freezing file snapshot, " +
"temp path = %s, snapshotId = %s.", this.tempSnapshotPath, this.snapshotId),
e
);
} }
} }
@ -115,7 +130,11 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
// This is a noop if freeze was called before calling close // This is a noop if freeze was called before calling close
Files.deleteIfExists(tempSnapshotPath); Files.deleteIfExists(tempSnapshotPath);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error closing snapshot writer, " +
"temp path = %s, snapshotId %s.", this.tempSnapshotPath, this.snapshotId),
e
);
} }
} }
@ -147,16 +166,15 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
* *
* @param logDir the directory for the topic partition * @param logDir the directory for the topic partition
* @param snapshotId the end offset and epoch for the snapshotId * @param snapshotId the end offset and epoch for the snapshotId
* @throws IOException for any IO error while creating the snapshot
*/ */
public static FileRawSnapshotWriter create( public static FileRawSnapshotWriter create(
Path logDir, Path logDir,
OffsetAndEpoch snapshotId, OffsetAndEpoch snapshotId,
Optional<ReplicatedLog> replicatedLog Optional<ReplicatedLog> replicatedLog
) { ) {
try {
Path path = Snapshots.createTempFile(logDir, snapshotId); Path path = Snapshots.createTempFile(logDir, snapshotId);
try {
return new FileRawSnapshotWriter( return new FileRawSnapshotWriter(
path, path,
FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)), FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)),
@ -164,7 +182,11 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
replicatedLog replicatedLog
); );
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new UncheckedIOException(
String.format("Error creating snapshot writer, " +
"temp path = %s, snapshotId %s.", path, snapshotId),
e
);
} }
} }
} }

View File

@ -68,15 +68,19 @@ public final class Snapshots {
return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX); return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX);
} }
public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws IOException { public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) {
Path dir = snapshotDir(logDir); Path dir = snapshotDir(logDir);
try {
// Create the snapshot directory if it doesn't exists // Create the snapshot directory if it doesn't exists
Files.createDirectories(dir); Files.createDirectories(dir);
String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId)); String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId));
return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX); return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error creating temporary file, logDir = %s, snapshotId = %s.",
dir.toAbsolutePath(), snapshotId), e);
}
} }
public static Optional<SnapshotPath> parse(Path path) { public static Optional<SnapshotPath> parse(Path path) {
@ -130,7 +134,7 @@ public final class Snapshots {
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException( throw new UncheckedIOException(
String.format( String.format(
"Error renaming snapshot file from %s to %s", "Error renaming snapshot file from %s to %s.",
immutablePath, immutablePath,
deletedPath deletedPath
), ),

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
@ -945,9 +946,10 @@ public class QuorumStateTest {
} }
@Test @Test
public void testInitializeWithCorruptedStore() throws IOException { public void testInitializeWithCorruptedStore() {
QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class); QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class);
Mockito.doThrow(IOException.class).when(stateStore).readElectionState(); Mockito.doThrow(UncheckedIOException.class).when(stateStore).readElectionState();
QuorumState state = buildQuorumState(Utils.mkSet(localId)); QuorumState state = buildQuorumState(Utils.mkSet(localId));
int epoch = 2; int epoch = 2;

View File

@ -373,12 +373,8 @@ public final class RaftClientTestContext {
} }
LeaderAndEpoch currentLeaderAndEpoch() { LeaderAndEpoch currentLeaderAndEpoch() {
try {
ElectionState election = quorumStateStore.readElectionState(); ElectionState election = quorumStateStore.readElectionState();
return new LeaderAndEpoch(election.leaderIdOpt, election.epoch); return new LeaderAndEpoch(election.leaderIdOpt, election.epoch);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
void expectAndGrantVotes( void expectAndGrantVotes(