KAFKA-16712 Fix race in TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest (#15962)

TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest has a race when it sets RemoteLogMetadataTopicPartitioner using the setter.

This change fixes the race condition by passing the RemoteLogMetadataTopicPartitioner instance in a Function<Integer, RemoteLogMetaedataTopicPartitioner> which is used in configure() in TopicBasedRemoteLogMetadataManager.

It also improves the waitingFor condition by spying on RemotePartitionMetadataStore and awaiting on Phasers to ensure ConsumerManager makes progress before performing assertions.

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Gaurav Narula 2024-05-16 07:56:06 +01:00 committed by GitHub
parent ffb31e172a
commit a1c2c68db1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 59 additions and 34 deletions

View File

@ -55,6 +55,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -91,21 +92,23 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>()); private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed; private volatile boolean initializationFailed;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier; private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;
private final Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction;
/** /**
* The default constructor delegates to the internal one, starting the consumer thread and * The default constructor delegates to the internal one, starting the consumer thread and
* supplying an instance of RemotePartitionMetadataStore by default. * supplying an instance of RemoteLogMetadataTopicPartitioner and RemotePartitionMetadataStore by default.
*/ */
public TopicBasedRemoteLogMetadataManager() { public TopicBasedRemoteLogMetadataManager() {
this(true, RemotePartitionMetadataStore::new); this(true, RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new);
} }
/** /**
* Used in tests to dynamically configure the instance. * Used in tests to dynamically configure the instance.
*/ */
TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) { TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
this.startConsumerThread = startConsumerThread; this.startConsumerThread = startConsumerThread;
this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier; this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier;
this.remoteLogMetadataTopicPartitionerFunction = remoteLogMetadataTopicPartitionerFunction;
} }
@Override @Override
@ -366,7 +369,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
log.info("Started configuring topic-based RLMM with configs: {}", configs); log.info("Started configuring topic-based RLMM with configs: {}", configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); rlmTopicPartitioner = remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get(); remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get();
configured = true; configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
@ -559,11 +562,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
return rlmmConfig; return rlmmConfig;
} }
// Visible for testing.
void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner rlmTopicPartitioner) {
this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
// Close all the resources. // Close all the resources.

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
@ -70,21 +71,21 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
// Call setup to start the cluster. // Call setup to start the cluster.
super.setUp(new EmptyTestInfo()); super.setUp(new EmptyTestInfo());
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, remotePartitionMetadataStoreSupplier); initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier);
} }
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions, public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread, boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) { Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner) {
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new); initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new);
} }
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions, public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread, boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner, Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) { Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remotePartitionMetadataStoreSupplier) { topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remoteLogMetadataTopicPartitioner, remotePartitionMetadataStoreSupplier) {
@Override @Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) { Set<TopicIdPartition> followerPartitions) {
@ -119,9 +120,6 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);
topicBasedRemoteLogMetadataManager.configure(configs); topicBasedRemoteLogMetadataManager.configure(configs);
if (remoteLogMetadataTopicPartitioner != null) {
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
}
try { try {
waitUntilInitialized(60_000); waitUntilInitialized(60_000);
} catch (TimeoutException e) { } catch (TimeoutException e) {

View File

@ -21,13 +21,11 @@ import kafka.utils.EmptyTestInfo;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -41,6 +39,14 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
@ -108,7 +114,27 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0)); final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) { final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = spy(new RemotePartitionMetadataStore());
// Think of a Phaser as a CountdownLatch which provides a "countUp" operation in addition to a countDown.
// The "parties" in a phaser are analogous to the "count". The awaiting semantics of Phaser
// however differ slightly compared to a CountdownLatch, which requires us to account for
// the test thread as well while initialising the Phaser.
Phaser initializationPhaser = new Phaser(2); // 1 to register test thread, 1 to register leaderTopicIdPartition
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationPhaser.arriveAndDeregister(); // similar to CountdownLatch::countDown
return result;
}).when(spyRemotePartitionMetadataStore).markInitialized(any());
Phaser handleRemoteLogSegmentMetadataPhaser = new Phaser(2); // 1 to register test thread, 1 to register leaderTopicIdPartition
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataPhaser.arriveAndDeregister(); // similar to CountdownLatch::countDown
return result;
}).when(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(any());
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override @Override
public int metadataPartition(TopicIdPartition topicIdPartition) { public int metadataPartition(TopicIdPartition topicIdPartition) {
// Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user // Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user
@ -120,9 +146,7 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
return 0; return 0;
} }
} }
}; }, () -> spyRemotePartitionMetadataStore);
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner);
// Add segments for these partitions but an exception is received as they have not yet been subscribed. // Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager // These messages would have been published to the respective metadata topic partitions but the ConsumerManager
@ -150,26 +174,31 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache. // fetching those events and build the cache.
waitUntilConsumerCatchesUp(30_000L); initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS); // similar to CountdownLatch::await
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
verify(spyRemotePartitionMetadataStore).markInitialized(leaderTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
clearInvocations(spyRemotePartitionMetadataStore);
// leader partitions would have received as it is registered, but follower partition is not yet registered, // leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception. // hence it throws an exception.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext()); Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition)); Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));
// Register follower partition // Register follower partition
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature
initializationPhaser.bulkRegister(2); // 1 for emptyTopicIdPartition and 1 for followerTopicIdPartition
handleRemoteLogSegmentMetadataPhaser.register(); // 1 for followerTopicIdPartition, emptyTopicIdPartition doesn't have a RemoteLogSegmentMetadata event
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition), rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition)); Collections.singleton(followerTopicIdPartition));
// In this state, all the metadata should be available in RLMM for both leader and follower partitions. initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}
private void waitUntilConsumerCatchesUp(long timeoutMs) throws TimeoutException, InterruptedException { verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
TestUtils.waitForCondition(() -> { verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
// If both the leader and follower partitions are mapped to the same metadata partition which is 0, it // In this state, all the metadata should be available in RLMM for both leader and follower partitions.
// should have at least 2 messages. That means, read offset should be >= 1 (including duplicate messages if any). Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1; Assertions.assertTrue(rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}, timeoutMs, "Consumer did not catch up");
} }
} }

View File

@ -65,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
} }
private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) { private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null); remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, RemoteLogMetadataTopicPartitioner::new);
} }
@AfterEach @AfterEach