KAFKA-18129 Simplifying share partition maybeInitialize code (#18093)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-12-08 19:02:10 +00:00 committed by GitHub
parent 104fa57933
commit ee4264439d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 51 additions and 105 deletions

View File

@ -47,7 +47,7 @@
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity|ClassDataAbstractionCoupling" files="SharePartition.java"/>
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>
<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>

View File

@ -71,7 +71,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -371,52 +370,25 @@ public class SharePartition {
*/
public CompletableFuture<Void> maybeInitialize() {
log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition);
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicReference<Optional<Throwable>> futureException = new AtomicReference<>(Optional.empty());
// Check if the share partition is already initialized.
InitializationResult initializationResult = checkInitializationCompletion();
if (initializationResult.isComplete()) {
if (initializationResult.throwable() != null) {
future.completeExceptionally(initializationResult.throwable());
} else {
future.complete(null);
}
return future;
}
// All the pending requests should wait to get completed before the share partition is initialized.
// Attain lock to avoid any concurrent requests to be processed.
lock.writeLock().lock();
boolean shouldFutureBeCompleted = false;
try {
// Re-check the state to verify if previous requests has already initialized the share partition.
initializationResult = checkInitializationCompletion();
if (initializationResult.isComplete()) {
if (initializationResult.throwable() != null) {
futureException.set(Optional.of(initializationResult.throwable()));
}
shouldFutureBeCompleted = true;
return future;
}
// Update state to initializing to avoid any concurrent requests to be processed.
partitionState = SharePartitionState.INITIALIZING;
if (initializedOrThrowException()) return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
completeInitializationWithException();
futureException.set(Optional.of(e));
shouldFutureBeCompleted = true;
return future;
} finally {
lock.writeLock().unlock();
if (shouldFutureBeCompleted) {
if (futureException.get().isPresent()) {
future.completeExceptionally(futureException.get().get());
} else {
future.complete(null);
}
}
return CompletableFuture.failedFuture(e);
}
// If code reaches here then the share partition is not initialized. Initialize the share partition.
// All the pending requests should wait to get completed before the share partition is initialized.
// Attain lock while updating the state to avoid any concurrent requests to be processed.
try {
if (!emptyToInitialState()) return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
// The share partition is not initialized, hence try to initialize it. There shall be only one
// request trying to initialize the share partition.
CompletableFuture<Void> future = new CompletableFuture<>();
// Initialize the share partition by reading the state from the persister.
persister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
@ -426,20 +398,19 @@ public class SharePartition {
.build())
.build()
).whenComplete((result, exception) -> {
Throwable throwable = null;
lock.writeLock().lock();
try {
if (exception != null) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception);
completeInitializationWithException();
futureException.set(Optional.of(exception));
throwable = exception;
return;
}
if (result == null || result.topicsData() == null || result.topicsData().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.",
groupId, topicIdPartition, result);
completeInitializationWithException();
futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@ -447,8 +418,7 @@ public class SharePartition {
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.",
groupId, topicIdPartition, result);
completeInitializationWithException();
futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@ -456,8 +426,7 @@ public class SharePartition {
if (partitionData.partition() != topicIdPartition.partition()) {
log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.",
groupId, topicIdPartition, partitionData);
completeInitializationWithException();
futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@ -465,18 +434,11 @@ public class SharePartition {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
completeInitializationWithException();
futureException.set(Optional.of(ex));
throwable = ex;
return;
}
try {
startOffset = startOffsetDuringInitialization(partitionData.startOffset());
} catch (Exception e) {
completeInitializationWithException();
futureException.set(Optional.of(e));
return;
}
startOffset = startOffsetDuringInitialization(partitionData.startOffset());
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
@ -485,8 +447,7 @@ public class SharePartition {
log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}"
+ " is less than the start offset: {}.", groupId, topicIdPartition,
stateBatch.firstOffset(), startOffset);
completeInitializationWithException();
futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
@ -507,10 +468,18 @@ public class SharePartition {
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
} catch (Exception e) {
throwable = e;
} finally {
boolean isFailed = throwable != null;
if (isFailed) {
partitionState = SharePartitionState.FAILED;
}
// Release the lock.
lock.writeLock().unlock();
if (futureException.get().isPresent()) {
future.completeExceptionally(futureException.get().get());
// Complete the future.
if (isFailed) {
future.completeExceptionally(throwable);
} else {
future.complete(null);
}
@ -1178,32 +1147,31 @@ public class SharePartition {
return partitionState() != SharePartitionState.ACTIVE;
}
private void completeInitializationWithException() {
private boolean emptyToInitialState() {
lock.writeLock().lock();
try {
partitionState = SharePartitionState.FAILED;
if (initializedOrThrowException()) return false;
partitionState = SharePartitionState.INITIALIZING;
return true;
} finally {
lock.writeLock().unlock();
}
}
private InitializationResult checkInitializationCompletion() {
private boolean initializedOrThrowException() {
SharePartitionState currentState = partitionState();
switch (currentState) {
case ACTIVE:
return new InitializationResult(true);
case FAILED:
return new InitializationResult(true, new IllegalStateException(String.format("Share partition failed to load %s-%s", groupId, topicIdPartition)));
case INITIALIZING:
return new InitializationResult(true, new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)));
case FENCED:
return new InitializationResult(true, new FencedStateEpochException(String.format("Share partition is fenced %s-%s", groupId, topicIdPartition)));
case EMPTY:
// Do not complete the future as the share partition is not yet initialized.
return new InitializationResult(false);
default:
throw new IllegalStateException("Unknown share partition state: " + currentState);
}
return switch (currentState) {
case ACTIVE -> true;
case FAILED -> throw new IllegalStateException(
String.format("Share partition failed to load %s-%s", groupId, topicIdPartition));
case INITIALIZING -> throw new LeaderNotAvailableException(
String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition));
case FENCED -> throw new FencedStateEpochException(
String.format("Share partition is fenced %s-%s", groupId, topicIdPartition));
case EMPTY ->
// The share partition is not yet initialized.
false;
};
}
private AcquiredRecords acquireNewBatchRecords(
@ -2517,26 +2485,4 @@ public class SharePartition {
this.offsetMetadata = offsetMetadata;
}
}
static final class InitializationResult {
private final boolean isComplete;
private final Throwable throwable;
private InitializationResult(boolean isComplete) {
this(isComplete, null);
}
private InitializationResult(boolean isComplete, Throwable throwable) {
this.isComplete = isComplete;
this.throwable = throwable;
}
private boolean isComplete() {
return isComplete;
}
private Throwable throwable() {
return throwable;
}
}
}