KAFKA-18084 Added write locks in SharePartition where locks were async calls were being made (#17957)

Reviewers: Andrew Schofield <aschofield@confluent.io>, poorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan <sushant.mahajan88@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Abhinav Dixit 2024-12-03 23:16:29 +05:30 committed by GitHub
parent c76fb5cb9b
commit 180112a4a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 169 additions and 142 deletions

View File

@ -380,6 +380,13 @@ public class SharePartition {
// Update state to initializing to avoid any concurrent requests to be processed.
partitionState = SharePartitionState.INITIALIZING;
} catch (Exception e) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
completeInitializationWithException(future, e);
return future;
} finally {
lock.writeLock().unlock();
}
// Initialize the share partition by reading the state from the persister.
persister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
@ -389,6 +396,8 @@ public class SharePartition {
.build())
.build()
).whenComplete((result, exception) -> {
lock.writeLock().lock();
try {
if (exception != null) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception);
completeInitializationWithException(future, exception);
@ -462,13 +471,10 @@ public class SharePartition {
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
future.complete(null);
});
} catch (Exception e) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
completeInitializationWithException(future, e);
} finally {
lock.writeLock().unlock();
}
});
return future;
}
@ -1645,8 +1651,13 @@ public class SharePartition {
future.complete(null);
return;
}
} finally {
lock.writeLock().unlock();
}
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
lock.writeLock().lock();
try {
if (exception != null) {
log.error("Failed to write state to persister for the share partition: {}-{}",
groupId, topicIdPartition, exception);
@ -1665,10 +1676,10 @@ public class SharePartition {
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
future.complete(null);
});
} finally {
lock.writeLock().unlock();
}
});
}
private void maybeUpdateCachedStateAndOffsets() {

View File

@ -490,11 +490,8 @@ public class SharePartitionTest {
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
executorService.shutdown();
}
for (CompletableFuture<Void> result : results) {
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
}
assertTrue(results.stream().allMatch(CompletableFuture::isDone));
assertFalse(results.stream().allMatch(CompletableFuture::isCompletedExceptionally));
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
// Verify the persister read state is called only once.
@ -771,24 +768,20 @@ public class SharePartitionTest {
Persister persister = Mockito.mock(Persister.class);
// Complete the future exceptionally for read state.
Mockito.when(persister.readState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new RuntimeException("Read exception")));
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
SharePartition sharePartition1 = SharePartitionBuilder.builder().withPersister(persister).build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
CompletableFuture<Void> result = sharePartition1.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, RuntimeException.class);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
assertEquals(SharePartitionState.FAILED, sharePartition1.partitionState());
persister = Mockito.mock(Persister.class);
// Throw exception for read state.
Mockito.when(persister.readState(Mockito.any())).thenThrow(new RuntimeException("Read exception"));
sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
SharePartition sharePartition2 = SharePartitionBuilder.builder().withPersister(persister).build();
result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, RuntimeException.class);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
}
@Test
@ -4453,12 +4446,20 @@ public class SharePartitionTest {
public void testWriteShareGroupStateWithWriteException() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
SharePartition sharePartition1 = SharePartitionBuilder.builder().withPersister(persister).build();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new RuntimeException("Write exception")));
CompletableFuture<Void> writeResult = sharePartition.writeShareGroupState(anyList());
CompletableFuture<Void> writeResult = sharePartition1.writeShareGroupState(anyList());
assertTrue(writeResult.isCompletedExceptionally());
assertFutureThrows(writeResult, IllegalStateException.class);
persister = Mockito.mock(Persister.class);
// Throw exception for write state.
mockPersisterReadStateMethod(persister);
SharePartition sharePartition2 = SharePartitionBuilder.builder().withPersister(persister).build();
Mockito.when(persister.writeState(Mockito.any())).thenThrow(new RuntimeException("Write exception"));
assertThrows(RuntimeException.class, () -> sharePartition2.writeShareGroupState(anyList()));
}
@Test

View File

@ -78,8 +78,13 @@ public class DefaultStatePersister implements Persister {
* @param request WriteShareGroupStateParameters
* @return A completable future of WriteShareGroupStateResult
*/
public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException {
public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) {
try {
validate(request);
} catch (Exception e) {
log.error("Unable to validate write state request", e);
return CompletableFuture.failedFuture(e);
}
GroupTopicPartitionData<PartitionStateBatchData> gtp = request.groupTopicPartitionData();
String groupId = gtp.groupId();
@ -169,8 +174,13 @@ public class DefaultStatePersister implements Persister {
* @param request ReadShareGroupStateParameters
* @return A completable future of ReadShareGroupStateResult
*/
public CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) throws IllegalArgumentException {
public CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) {
try {
validate(request);
} catch (Exception e) {
log.error("Unable to validate read state request", e);
return CompletableFuture.failedFuture(e);
}
GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData();
String groupId = gtp.groupId();
Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateResponse>>> futureMap = new HashMap<>();

View File

@ -40,7 +40,7 @@ public interface Persister {
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) throws IllegalArgumentException;
CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request);
/**
* Write share-partition state.
@ -48,7 +48,7 @@ public interface Persister {
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException;
CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request);
/**
* Delete share-partition state.

View File

@ -50,8 +50,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
@ -132,68 +132,71 @@ class DefaultStatePersisterTest {
int incorrectPartition = -1;
// Request Parameters are null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(null);
});
CompletableFuture<WriteShareGroupStateResult> result = defaultStatePersister.writeState(null);
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// groupTopicPartitionData is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
});
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// groupId is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(null).build()).build());
});
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// topicsData is empty
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(groupId)
.setTopicsData(Collections.emptyList()).build()).build());
});
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// topicId is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(null,
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
partition, 1, 0, 0, null))))
).build()).build());
});
partition, 1, 0, 0, null))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// partitionData is empty
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicId,
Collections.emptyList()))
).build()).build());
});
.setTopicsData(Collections.singletonList(new TopicData<>(topicId, Collections.emptyList()))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// partition value is incorrect
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicId,
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
incorrectPartition, 1, 0, 0, null))))
).build()).build());
});
incorrectPartition, 1, 0, 0, null))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
}
@Test
@ -205,68 +208,70 @@ class DefaultStatePersisterTest {
int incorrectPartition = -1;
// Request Parameters are null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(null);
});
CompletableFuture<ReadShareGroupStateResult> result = defaultStatePersister.readState(null);
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// groupTopicPartitionData is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
});
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// groupId is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(null).build()).build());
});
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// topicsData is empty
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(groupId)
.setTopicsData(Collections.emptyList()).build()).build());
});
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// topicId is null
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(null,
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
partition, 1))))
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(partition, 1))))
).build()).build());
});
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// partitionData is empty
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicId,
Collections.emptyList()))
).build()).build());
});
.setTopicsData(Collections.singletonList(new TopicData<>(topicId, Collections.emptyList()))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
// partition value is incorrect
assertThrows(IllegalArgumentException.class, () -> {
DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicId,
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
incorrectPartition, 1))))
).build()).build());
});
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, IllegalArgumentException.class);
}
@Test