Fixed streams tests in PlaintextAdminIntegrationTest

Surfaced isPendingCallbacks() at the SubscriptionState API so that partitions that are pending callback invocation are not returned. This is also in agreement with the JavaDoc which states:

"This will give the set of topic partitions currently assigned to the consumer (which may be none if . . . the partitions are in the process of getting reassigned)."
This commit is contained in:
Kirk True 2025-09-30 13:20:05 -07:00
parent 0aed4aff89
commit ab42fca4f4
3 changed files with 16 additions and 8 deletions

View File

@ -1689,7 +1689,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
public Set<TopicPartition> assignment() {
acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(subscriptions.assignedPartitions());
return subscriptions.assignedPartitions().stream()
.filter(tp -> !subscriptions.isPendingCallbacks(tp))
.collect(Collectors.toUnmodifiableSet());
} finally {
release();
}

View File

@ -896,6 +896,11 @@ public class SubscriptionState {
return tps != null && isFetchableAndSubscribed(tp, tps);
}
public synchronized boolean isPendingCallbacks(TopicPartition tp) {
TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
return assignedOrNull != null && assignedOrNull.isPendingCallbacks();
}
public synchronized boolean hasValidPosition(TopicPartition tp) {
TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
return assignedOrNull != null && assignedOrNull.hasValidPosition();
@ -1212,7 +1217,11 @@ public class SubscriptionState {
}
private boolean isFetchable() {
return !paused && !pendingRevocation && !pendingOnAssignedCallback && hasValidPosition();
return !paused && !isPendingCallbacks() && hasValidPosition();
}
private boolean isPendingCallbacks() {
return pendingRevocation || pendingOnAssignedCallback;
}
private void highWatermark(Long highWatermark) {

View File

@ -4618,10 +4618,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
streams.assignment().size() == testNumPartitions
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
TestUtils.waitUntilTrue(() => {
@ -4678,10 +4677,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
streams.assignment().size() == testNumPartitions
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
@ -4755,10 +4753,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
streams.assignment().size() == testNumPartitions
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets