KAFKA-19205: inconsistent result of beginningOffsets/endoffset between classic and async consumer with 0 timeout (#19578)
CI / build (push) Waiting to run Details

In the return results of the methods beginningOffsets and endOffset, if
timeout == 0, then an empty Map should be returned uniformly instead of
in the form of <TopicPartition, null>

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
 <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet
 Magrans <lmagrans@confluent.io>
This commit is contained in:
xijiu 2025-05-04 01:12:20 +08:00 committed by GitHub
parent 93e65c4539
commit b5cceb43e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 27 additions and 6 deletions

View File

@ -1634,7 +1634,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param partitions the partitions to get the earliest offsets * @param partitions the partitions to get the earliest offsets
* @param timeout The maximum amount of time to await retrieval of the beginning offsets * @param timeout The maximum amount of time to await retrieval of the beginning offsets
* *
* @return The earliest available offsets for the given partitions * @return The earliest available offsets for the given partitions, and it will return empty map if zero timeout is provided
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
@ -1684,7 +1684,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param partitions the partitions to get the end offsets. * @param partitions the partitions to get the end offsets.
* @param timeout The maximum amount of time to await retrieval of the end offsets * @param timeout The maximum amount of time to await retrieval of the end offsets
* *
* @return The end offsets for the given partitions. * @return The end offsets for the given partitions, and it will return empty map if zero timeout is provided
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before

View File

@ -1306,7 +1306,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// and throw timeout exception if it cannot complete in time. // and throw timeout exception if it cannot complete in time.
if (timeout.isZero()) { if (timeout.isZero()) {
applicationEventHandler.add(listOffsetsEvent); applicationEventHandler.add(listOffsetsEvent);
return listOffsetsEvent.emptyResults(); // It is used to align with classic consumer.
// When the "timeout == 0", the classic consumer will return an empty map.
// Therefore, the AsyncKafkaConsumer needs to be consistent with it.
return new HashMap<>();
} }
Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap; Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;

View File

@ -993,9 +993,8 @@ public class AsyncKafkaConsumerTest {
TopicPartition tp = new TopicPartition("topic1", 0); TopicPartition tp = new TopicPartition("topic1", 0);
Map<TopicPartition, Long> result = Map<TopicPartition, Long> result =
assertDoesNotThrow(() -> consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO)); assertDoesNotThrow(() -> consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
// The result should be {tp=null} assertNotNull(result);
assertTrue(result.containsKey(tp)); assertEquals(0, result.size());
assertNull(result.get(tp));
verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class)); verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class));
} }

View File

@ -14,6 +14,7 @@ package kafka.api
import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl} import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
import java.lang.{Long => JLong}
import java.time.Duration import java.time.Duration
import java.util import java.util
import java.util.Arrays.asList import java.util.Arrays.asList
@ -873,4 +874,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
waitTimeMs=leaveGroupTimeoutMs waitTimeMs=leaveGroupTimeoutMs
) )
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = {
val consumer = createConsumer()
val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO)
assertNotNull(result1)
assertEquals(0, result1.size())
val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO)
assertNotNull(result2)
assertEquals(0, result2.size())
val result3 = consumer.offsetsForTimes(Map[TopicPartition, JLong]((tp, 0)).asJava, Duration.ZERO)
assertNotNull(result3)
assertEquals(1, result3.size())
assertNull(result3.get(tp))
}
} }