diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5624b5c70ef..2f8ff2d84c2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -47,7 +47,7 @@
-
+
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java
index 00ec974a722..3dee1e35f3e 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -71,7 +71,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -371,52 +370,25 @@ public class SharePartition {
*/
public CompletableFuture maybeInitialize() {
log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition);
- CompletableFuture future = new CompletableFuture<>();
- AtomicReference> futureException = new AtomicReference<>(Optional.empty());
// Check if the share partition is already initialized.
- InitializationResult initializationResult = checkInitializationCompletion();
- if (initializationResult.isComplete()) {
- if (initializationResult.throwable() != null) {
- future.completeExceptionally(initializationResult.throwable());
- } else {
- future.complete(null);
- }
- return future;
- }
-
- // All the pending requests should wait to get completed before the share partition is initialized.
- // Attain lock to avoid any concurrent requests to be processed.
- lock.writeLock().lock();
- boolean shouldFutureBeCompleted = false;
try {
- // Re-check the state to verify if previous requests has already initialized the share partition.
- initializationResult = checkInitializationCompletion();
- if (initializationResult.isComplete()) {
- if (initializationResult.throwable() != null) {
- futureException.set(Optional.of(initializationResult.throwable()));
- }
- shouldFutureBeCompleted = true;
- return future;
- }
-
- // Update state to initializing to avoid any concurrent requests to be processed.
- partitionState = SharePartitionState.INITIALIZING;
+ if (initializedOrThrowException()) return CompletableFuture.completedFuture(null);
} catch (Exception e) {
- log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
- completeInitializationWithException();
- futureException.set(Optional.of(e));
- shouldFutureBeCompleted = true;
- return future;
- } finally {
- lock.writeLock().unlock();
- if (shouldFutureBeCompleted) {
- if (futureException.get().isPresent()) {
- future.completeExceptionally(futureException.get().get());
- } else {
- future.complete(null);
- }
- }
+ return CompletableFuture.failedFuture(e);
}
+
+ // If code reaches here then the share partition is not initialized. Initialize the share partition.
+ // All the pending requests should wait to get completed before the share partition is initialized.
+ // Attain lock while updating the state to avoid any concurrent requests to be processed.
+ try {
+ if (!emptyToInitialState()) return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+
+ // The share partition is not initialized, hence try to initialize it. There shall be only one
+ // request trying to initialize the share partition.
+ CompletableFuture future = new CompletableFuture<>();
// Initialize the share partition by reading the state from the persister.
persister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder()
@@ -426,20 +398,19 @@ public class SharePartition {
.build())
.build()
).whenComplete((result, exception) -> {
+ Throwable throwable = null;
lock.writeLock().lock();
try {
if (exception != null) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception);
- completeInitializationWithException();
- futureException.set(Optional.of(exception));
+ throwable = exception;
return;
}
if (result == null || result.topicsData() == null || result.topicsData().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.",
groupId, topicIdPartition, result);
- completeInitializationWithException();
- futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
+ throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@@ -447,8 +418,7 @@ public class SharePartition {
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.",
groupId, topicIdPartition, result);
- completeInitializationWithException();
- futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
+ throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@@ -456,8 +426,7 @@ public class SharePartition {
if (partitionData.partition() != topicIdPartition.partition()) {
log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.",
groupId, topicIdPartition, partitionData);
- completeInitializationWithException();
- futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
+ throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
@@ -465,18 +434,11 @@ public class SharePartition {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
- completeInitializationWithException();
- futureException.set(Optional.of(ex));
+ throwable = ex;
return;
}
- try {
- startOffset = startOffsetDuringInitialization(partitionData.startOffset());
- } catch (Exception e) {
- completeInitializationWithException();
- futureException.set(Optional.of(e));
- return;
- }
+ startOffset = startOffsetDuringInitialization(partitionData.startOffset());
stateEpoch = partitionData.stateEpoch();
List stateBatches = partitionData.stateBatches();
@@ -485,8 +447,7 @@ public class SharePartition {
log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}"
+ " is less than the start offset: {}.", groupId, topicIdPartition,
stateBatch.firstOffset(), startOffset);
- completeInitializationWithException();
- futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))));
+ throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
@@ -507,10 +468,18 @@ public class SharePartition {
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
+ } catch (Exception e) {
+ throwable = e;
} finally {
+ boolean isFailed = throwable != null;
+ if (isFailed) {
+ partitionState = SharePartitionState.FAILED;
+ }
+ // Release the lock.
lock.writeLock().unlock();
- if (futureException.get().isPresent()) {
- future.completeExceptionally(futureException.get().get());
+ // Complete the future.
+ if (isFailed) {
+ future.completeExceptionally(throwable);
} else {
future.complete(null);
}
@@ -1178,32 +1147,31 @@ public class SharePartition {
return partitionState() != SharePartitionState.ACTIVE;
}
- private void completeInitializationWithException() {
+ private boolean emptyToInitialState() {
lock.writeLock().lock();
try {
- partitionState = SharePartitionState.FAILED;
+ if (initializedOrThrowException()) return false;
+ partitionState = SharePartitionState.INITIALIZING;
+ return true;
} finally {
lock.writeLock().unlock();
}
}
- private InitializationResult checkInitializationCompletion() {
+ private boolean initializedOrThrowException() {
SharePartitionState currentState = partitionState();
- switch (currentState) {
- case ACTIVE:
- return new InitializationResult(true);
- case FAILED:
- return new InitializationResult(true, new IllegalStateException(String.format("Share partition failed to load %s-%s", groupId, topicIdPartition)));
- case INITIALIZING:
- return new InitializationResult(true, new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)));
- case FENCED:
- return new InitializationResult(true, new FencedStateEpochException(String.format("Share partition is fenced %s-%s", groupId, topicIdPartition)));
- case EMPTY:
- // Do not complete the future as the share partition is not yet initialized.
- return new InitializationResult(false);
- default:
- throw new IllegalStateException("Unknown share partition state: " + currentState);
- }
+ return switch (currentState) {
+ case ACTIVE -> true;
+ case FAILED -> throw new IllegalStateException(
+ String.format("Share partition failed to load %s-%s", groupId, topicIdPartition));
+ case INITIALIZING -> throw new LeaderNotAvailableException(
+ String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition));
+ case FENCED -> throw new FencedStateEpochException(
+ String.format("Share partition is fenced %s-%s", groupId, topicIdPartition));
+ case EMPTY ->
+ // The share partition is not yet initialized.
+ false;
+ };
}
private AcquiredRecords acquireNewBatchRecords(
@@ -2517,26 +2485,4 @@ public class SharePartition {
this.offsetMetadata = offsetMetadata;
}
}
-
- static final class InitializationResult {
- private final boolean isComplete;
- private final Throwable throwable;
-
- private InitializationResult(boolean isComplete) {
- this(isComplete, null);
- }
-
- private InitializationResult(boolean isComplete, Throwable throwable) {
- this.isComplete = isComplete;
- this.throwable = throwable;
- }
-
- private boolean isComplete() {
- return isComplete;
- }
-
- private Throwable throwable() {
- return throwable;
- }
- }
}