MINOR: Replace `singleton` with `Set.of` in the SubscriptionStateTest (#19993)

Replace `singleton` with `Set.of` in the SubscriptionStateTest.

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Yung <yungyung7654321@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-06-20 15:33:41 +08:00 committed by GitHub
parent d59d39a229
commit 659ace836c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 113 additions and 115 deletions

View File

@ -35,7 +35,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -44,7 +43,6 @@ import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -66,14 +64,14 @@ public class SubscriptionStateTest {
@Test
public void partitionAssignment() {
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
state.assignFromUser(Set.of(tp0));
assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertFalse(state.hasAllFetchPositions());
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertEquals(1L, state.position(tp0).offset);
state.assignFromUser(Collections.emptySet());
state.assignFromUser(Set.of());
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertFalse(state.isAssigned(tp0));
@ -94,20 +92,20 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
state.assignFromSubscribed(singleton(t1p0));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
state.assignFromSubscribed(Set.of(t1p0));
// assigned partitions should immediately change
assertEquals(singleton(t1p0), state.assignedPartitions());
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@ -166,24 +164,24 @@ public class SubscriptionStateTest {
@Test
public void testGroupSubscribe() {
state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
assertEquals(singleton(topic1), state.metadataTopics());
state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
assertEquals(Set.of(topic1), state.metadataTopics());
assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());
assertFalse(state.groupSubscribe(Set.of(topic1)));
assertEquals(Set.of(topic1), state.metadataTopics());
assertTrue(state.groupSubscribe(Set.of(topic, topic1)));
assertEquals(Set.of(topic, topic1), state.metadataTopics());
// `groupSubscribe` does not accumulate
assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());
assertFalse(state.groupSubscribe(Set.of(topic1)));
assertEquals(Set.of(topic1), state.metadataTopics());
state.subscribe(singleton("anotherTopic"), Optional.of(rebalanceListener));
state.subscribe(Set.of("anotherTopic"), Optional.of(rebalanceListener));
assertEquals(Set.of(topic1, "anotherTopic"), state.metadataTopics());
assertFalse(state.groupSubscribe(singleton("anotherTopic")));
assertEquals(singleton("anotherTopic"), state.metadataTopics());
assertFalse(state.groupSubscribe(Set.of("anotherTopic")));
assertEquals(Set.of("anotherTopic"), state.metadataTopics());
}
@Test
@ -193,44 +191,44 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
state.subscribeFromPattern(Collections.singleton(topic));
state.subscribeFromPattern(Set.of(topic));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
state.assignFromSubscribed(singleton(tp1));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
state.assignFromSubscribed(Set.of(tp1));
// assigned partitions should immediately change
assertEquals(singleton(tp1), state.assignedPartitions());
assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
assertEquals(Set.of(topic), state.subscription());
assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
state.assignFromSubscribed(singleton(t1p0));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
state.assignFromSubscribed(Set.of(t1p0));
// assigned partitions should immediately change
assertEquals(singleton(t1p0), state.assignedPartitions());
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
assertEquals(Set.of(topic), state.subscription());
state.subscribe(Pattern.compile(".*t"), Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.subscribeFromPattern(singleton(topic));
state.subscribeFromPattern(Set.of(topic));
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
state.assignFromSubscribed(singleton(tp0));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
state.assignFromSubscribed(Set.of(tp0));
// assigned partitions should immediately change
assertEquals(singleton(tp0), state.assignedPartitions());
assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
assertEquals(Set.of(topic), state.subscription());
state.unsubscribe();
// assigned partitions should immediately change
@ -248,10 +246,10 @@ public class SubscriptionStateTest {
state.unsubscribe();
assertEquals(2, state.assignmentId());
assertEquals(Collections.emptySet(), state.assignedPartitions());
assertEquals(Set.of(), state.assignedPartitions());
Set<TopicPartition> autoAssignment = Set.of(t1p0);
state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
assertTrue(state.checkAssignmentMatchedSubscription(autoAssignment));
state.assignFromSubscribed(autoAssignment);
assertEquals(3, state.assignmentId());
@ -260,7 +258,7 @@ public class SubscriptionStateTest {
@Test
public void partitionReset() {
state.assignFromUser(singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seek(tp0, 5);
assertEquals(5L, state.position(tp0).offset);
state.requestOffsetReset(tp0);
@ -276,29 +274,29 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertTrue(state.hasAutoAssignedPartitions());
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
state.assignFromSubscribed(singleton(tp0));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
state.assignFromSubscribed(Set.of(tp0));
state.seek(tp0, 1);
assertEquals(1L, state.position(tp0).offset);
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
state.assignFromSubscribed(singleton(tp1));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
state.assignFromSubscribed(Set.of(tp1));
assertTrue(state.isAssigned(tp1));
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(singleton(tp1), state.assignedPartitions());
assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
}
@Test
public void partitionPause() {
state.assignFromUser(singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
@ -309,10 +307,10 @@ public class SubscriptionStateTest {
@Test
public void testMarkingPartitionPending() {
state.assignFromUser(singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.markPendingRevocation(singleton(tp0));
state.markPendingRevocation(Set.of(tp0));
assertFalse(state.isFetchable(tp0));
assertFalse(state.isPaused(tp0));
}
@ -320,17 +318,17 @@ public class SubscriptionStateTest {
@Test
public void testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
// New partition assigned. Should not be fetchable or initializing positions.
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0), singleton(tp0));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
assertEquals(singleton(tp0.topic()), state.subscription());
assertEquals(Set.of(tp0.topic()), state.subscription());
// Simulate callback setting position to start fetching from
state.seek(tp0, 100);
// Callback completed. Partition should be fetchable, and should not require
// initializing positions (position already defined in the callback)
state.enablePartitionsAwaitingCallback(singleton(tp0));
state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(0, state.initializingPartitions().size());
assertTrue(state.isFetchable(tp0));
assertTrue(state.hasAllFetchPositions());
@ -340,14 +338,14 @@ public class SubscriptionStateTest {
@Test
public void testAssignedPartitionsAwaitingCallbackInitializePositionsWhenCallbackCompletes() {
// New partition assigned. Should not be fetchable or initializing positions.
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0), singleton(tp0));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
assertEquals(singleton(tp0.topic()), state.subscription());
assertEquals(Set.of(tp0.topic()), state.subscription());
// Callback completed (without updating positions). Partition should require initializing
// positions, and start fetching once a valid position is set.
state.enablePartitionsAwaitingCallback(singleton(tp0));
state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(1, state.initializingPartitions().size());
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
@ -358,23 +356,23 @@ public class SubscriptionStateTest {
@Test
public void testAssignedPartitionsAwaitingCallbackDoesNotAffectPreviouslyOwnedPartitions() {
// First partition assigned and callback completes.
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0), singleton(tp0));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
assertEquals(singleton(tp0.topic()), state.subscription());
state.enablePartitionsAwaitingCallback(singleton(tp0));
assertEquals(Set.of(tp0.topic()), state.subscription());
state.enablePartitionsAwaitingCallback(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
// New partition added to the assignment. Owned partitions should continue to be
// fetchable, while the newly added should not be fetchable until callback completes.
state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), singleton(tp1));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), Set.of(tp1));
assertTrue(state.isFetchable(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(1, state.initializingPartitions().size());
// Callback completed. Added partition be initializing positions and become fetchable when it gets one.
state.enablePartitionsAwaitingCallback(singleton(tp1));
state.enablePartitionsAwaitingCallback(Set.of(tp1));
assertEquals(1, state.initializingPartitions().size());
assertEquals(tp1, state.initializingPartitions().iterator().next());
state.seek(tp1, 200);
@ -382,7 +380,7 @@ public class SubscriptionStateTest {
}
private void assertAssignmentAppliedAwaitingCallback(TopicPartition topicPartition) {
assertEquals(singleton(topicPartition), state.assignedPartitions());
assertEquals(Set.of(topicPartition), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertFalse(state.isFetchable(topicPartition));
@ -392,9 +390,9 @@ public class SubscriptionStateTest {
@Test
public void invalidPositionUpdate() {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
state.assignFromSubscribed(singleton(tp0));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
state.assignFromSubscribed(Set.of(tp0));
assertThrows(IllegalStateException.class, () -> state.position(tp0,
new SubscriptionState.FetchPosition(0, Optional.empty(), leaderAndEpoch)));
@ -402,15 +400,15 @@ public class SubscriptionStateTest {
@Test
public void cantAssignPartitionForUnsubscribedTopics() {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
}
@Test
public void cantAssignPartitionForUnmatchedPattern() {
state.subscribe(Pattern.compile(".*t"), Optional.of(rebalanceListener));
state.subscribeFromPattern(Collections.singleton(topic));
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
state.subscribeFromPattern(Set.of(topic));
assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
}
@Test
@ -421,26 +419,26 @@ public class SubscriptionStateTest {
@Test
public void cantSubscribeTopicAndPattern() {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePartitionAndPattern() {
state.assignFromUser(singleton(tp0));
state.assignFromUser(Set.of(tp0));
assertThrows(IllegalStateException.class, () -> state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePatternAndTopic() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(singleton(topic), Optional.of(rebalanceListener)));
assertThrows(IllegalStateException.class, () -> state.subscribe(Set.of(topic), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePatternAndPartition() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.assignFromUser(singleton(tp0)));
assertThrows(IllegalStateException.class, () -> state.assignFromUser(Set.of(tp0)));
}
@Test
@ -485,14 +483,14 @@ public class SubscriptionStateTest {
TopicIdPartitionSet reconciledAssignmentFromRegex = new TopicIdPartitionSet();
reconciledAssignmentFromRegex.addAll(Uuid.randomUuid(), topic, Set.of(0));
state.assignFromSubscribedAwaitingCallback(singleton(tp0), singleton(tp0));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
// Simulate callback setting position to start fetching from
state.seek(tp0, 100);
// Callback completed. Partition should be fetchable, from the position previously defined
state.enablePartitionsAwaitingCallback(singleton(tp0));
state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(0, state.initializingPartitions().size());
assertTrue(state.isFetchable(tp0));
assertTrue(state.hasAllFetchPositions());
@ -513,7 +511,7 @@ public class SubscriptionStateTest {
state.setAssignedTopicIds(Set.of(firstAssignedUuid, secondAssignedUuid));
// First reconciliation completes and updates the subscription state
state.assignFromSubscribedAwaitingCallback(singleton(tp0), singleton(tp0));
state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
// First assignment should have been applied
assertAssignmentAppliedAwaitingCallback(tp0);
@ -557,16 +555,16 @@ public class SubscriptionStateTest {
public void unsubscribeUserAssignment() {
state.assignFromUser(Set.of(tp0, tp1));
state.unsubscribe();
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
assertEquals(singleton(topic), state.subscription());
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertEquals(Set.of(topic), state.subscription());
}
@Test
public void unsubscribeUserSubscribe() {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.unsubscribe();
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
state.assignFromUser(Set.of(tp0));
assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
}
@ -574,10 +572,10 @@ public class SubscriptionStateTest {
public void unsubscription() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
state.subscribeFromPattern(Set.of(topic, topic1));
assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
state.assignFromSubscribed(singleton(tp1));
assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
state.assignFromSubscribed(Set.of(tp1));
assertEquals(singleton(tp1), state.assignedPartitions());
assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@ -585,8 +583,8 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
state.assignFromUser(Set.of(tp0));
assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@ -597,15 +595,15 @@ public class SubscriptionStateTest {
@Test
public void testPreferredReadReplicaLease() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
// Default state
assertFalse(state.preferredReadReplica(tp0, 0L).isPresent());
// Set the preferred replica with lease
state.updatePreferredReadReplica(tp0, 42, () -> 10L);
TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L), value -> assertEquals(value.intValue(), 42));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L), value -> assertEquals(value.intValue(), 42));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L), value -> assertEquals(42, value.intValue()));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L), value -> assertEquals(42, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
// Unset the preferred replica
@ -615,20 +613,20 @@ public class SubscriptionStateTest {
// Set to new preferred replica with lease
state.updatePreferredReadReplica(tp0, 43, () -> 20L);
TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L), value -> assertEquals(value.intValue(), 43));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L), value -> assertEquals(value.intValue(), 43));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L), value -> assertEquals(43, value.intValue()));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L), value -> assertEquals(43, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 21L).isPresent());
// Set to new preferred replica without clearing first
state.updatePreferredReadReplica(tp0, 44, () -> 30L);
TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L), value -> assertEquals(value.intValue(), 44));
TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L), value -> assertEquals(44, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 31L).isPresent());
}
@Test
public void testSeekUnvalidatedWithNoOffsetEpoch() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
// Seek with no offset epoch requires no validation no matter what the current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(),
@ -652,7 +650,7 @@ public class SubscriptionStateTest {
@Test
public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
// Seek with no offset epoch requires no validation no matter what the current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
@ -672,7 +670,7 @@ public class SubscriptionStateTest {
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(broker1.idString(), NodeApiVersions.create());
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
@ -701,7 +699,7 @@ public class SubscriptionStateTest {
@Test
public void testSeekValidatedShouldClearAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
@ -719,7 +717,7 @@ public class SubscriptionStateTest {
@Test
public void testCompleteValidationShouldClearAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
@ -736,7 +734,7 @@ public class SubscriptionStateTest {
@Test
public void testOffsetResetWhileAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
@ -750,7 +748,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidation() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -777,7 +775,7 @@ public class SubscriptionStateTest {
apiVersions.update("1", oldApis);
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
@ -806,7 +804,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidationAfterPositionChange() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -835,7 +833,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidationAfterOffsetReset() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -861,7 +859,7 @@ public class SubscriptionStateTest {
@Test
public void testTruncationDetectionWithResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -890,7 +888,7 @@ public class SubscriptionStateTest {
public void testTruncationDetectionWithoutResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -920,7 +918,7 @@ public class SubscriptionStateTest {
public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -945,7 +943,7 @@ public class SubscriptionStateTest {
public void testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@ -993,7 +991,7 @@ public class SubscriptionStateTest {
// Check that offset reset works when we can't validate offsets (older brokers)
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
// Reset offsets
state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST);
@ -1039,7 +1037,7 @@ public class SubscriptionStateTest {
@Test
public void nullPositionLagOnNoPosition() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
@ -1053,7 +1051,7 @@ public class SubscriptionStateTest {
@Test
public void testPositionOrNull() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
state.seek(tp0, 5);
@ -1063,7 +1061,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingHighWatermark() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
final long highWatermark = 10L;
@ -1074,7 +1072,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingLogStartOffset() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
final long position = 25;
state.seek(tp0, position);
@ -1087,7 +1085,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingLastStableOffset() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
final long lastStableOffset = 10L;
@ -1098,7 +1096,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingPreferredReadReplica() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
final int preferredReadReplicaId = 10;
@ -1111,7 +1109,7 @@ public class SubscriptionStateTest {
@Test
public void testRequestOffsetResetIfPartitionAssigned() {
state.assignFromUser(Collections.singleton(tp0));
state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new TopicPartition("unassigned", 0);
state.requestOffsetResetIfPartitionAssigned(tp0);