From 3f4c25fe1d800abafa9df81ccaa42b1b3921c824 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 12 Sep 2024 02:08:33 +0800 Subject: [PATCH] KAFKA-17448: New consumer seek should update positions in background thread (#17075) Reviewers: Lianet Magrans , Kirk True --- .../internals/AsyncKafkaConsumer.java | 22 ++++--- .../internals/events/ApplicationEvent.java | 3 +- .../events/ApplicationEventProcessor.java | 18 ++++++ .../events/SeekUnvalidatedEvent.java | 59 +++++++++++++++++++ .../internals/AsyncKafkaConsumerTest.java | 25 ++++++++ .../events/ApplicationEventProcessorTest.java | 33 +++++++++++ 6 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e988afa10c7..06ee3bc3616 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -56,6 +55,7 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; +import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; @@ -790,11 +790,10 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { acquireAndEnsureOpen(); try { log.info("Seeking to offset {} for partition {}", offset, partition); - SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( - offset, - Optional.empty(), // This will ensure we skip validation - metadata.currentLeader(partition)); - subscriptions.seekUnvalidated(partition, newPosition); + Timer timer = time.timer(defaultApiTimeoutMs); + SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent( + calculateDeadlineMs(timer), partition, offset, Optional.empty()); + applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); } finally { release(); } @@ -815,13 +814,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } else { log.info("Seeking to offset {} for partition {}", offset, partition); } - Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(partition); - SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( - offsetAndMetadata.offset(), - offsetAndMetadata.leaderEpoch(), - currentLeaderAndEpoch); updateLastSeenEpochIfNewer(partition, offsetAndMetadata); - subscriptions.seekUnvalidated(partition, newPosition); + + Timer timer = time.timer(defaultApiTimeoutMs); + SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent( + calculateDeadlineMs(timer), partition, offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch()); + applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index a31f458e5e0..4b0584b5bb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -35,7 +35,8 @@ public abstract class ApplicationEvent { COMMIT_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, - SHARE_ACKNOWLEDGE_ON_CLOSE + SHARE_ACKNOWLEDGE_ON_CLOSE, + SEEK_UNVALIDATED, } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index b28750ac7fb..6ce8737c78a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -147,6 +147,10 @@ public class ApplicationEventProcessor implements EventProcessor { + private final TopicPartition partition; + private final long offset; + private final Optional offsetEpoch; + + public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition, long offset, Optional offsetEpoch) { + super(Type.SEEK_UNVALIDATED, deadlineMs); + this.partition = partition; + this.offset = offset; + this.offsetEpoch = offsetEpoch; + } + + public TopicPartition partition() { + return partition; + } + + public long offset() { + return offset; + } + + public Optional offsetEpoch() { + return offsetEpoch; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", partition=" + partition + + ", offset=" + offset + + offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse(""); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index ae694c12a09..2526b6447a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; +import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; @@ -769,6 +770,7 @@ public class AsyncKafkaConsumerTest { new Node(1, "host", 9000)), Optional.of(1))); completeAssignmentChangeEventSuccessfully(); consumer.assign(Arrays.asList(t0, t1)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(t0, 10); consumer.seek(t1, 20); @@ -804,6 +806,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -827,6 +830,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -894,6 +898,7 @@ public class AsyncKafkaConsumerTest { doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); consumer.commitAsync(); @@ -925,6 +930,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -1018,6 +1024,7 @@ public class AsyncKafkaConsumerTest { "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + completeSeekUnvalidatedEventSuccessfully(); subscriptions.seek(new TopicPartition("topic", 0), 100); consumer.commitSyncAllConsumed(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); @@ -1035,6 +1042,7 @@ public class AsyncKafkaConsumerTest { "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + completeSeekUnvalidatedEventSuccessfully(); subscriptions.seek(new TopicPartition("topic", 0), 100); verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class)); } @@ -1293,6 +1301,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 10); consumer.wakeup(); @@ -1322,6 +1331,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); consumer.commitAsync(); @@ -1340,6 +1350,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(cb); @@ -2187,6 +2198,20 @@ public class AsyncKafkaConsumerTest { }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class)); } + private void completeSeekUnvalidatedEventSuccessfully() { + doAnswer(invocation -> { + SeekUnvalidatedEvent event = invocation.getArgument(0); + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + event.offset(), + event.offsetEpoch(), + metadata.currentLeader(event.partition()) + ); + consumer.subscriptions().seekUnvalidated(event.partition(), newPosition); + event.future().complete(null); + return null; + }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class)); + } + private void forceCommitCallbackInvocation() { // Invokes callback consumer.commitAsync(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index df90c43bf4a..84a7ac84d1c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager; @@ -53,6 +54,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -183,6 +186,36 @@ public class ApplicationEventProcessorTest { assertInstanceOf(IllegalStateException.class, e.getCause()); } + @Test + public void testSeekUnvalidatedEvent() { + TopicPartition tp = new TopicPartition("topic", 0); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + 0, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch()); + SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty()); + + setupProcessor(false); + doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp); + doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any()); + processor.process(event); + verify(metadata).currentLeader(tp); + verify(subscriptionState).seekUnvalidated(tp, position); + assertDoesNotThrow(() -> event.future().get()); + } + + @Test + public void testSeekUnvalidatedEventWithException() { + TopicPartition tp = new TopicPartition("topic", 0); + SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty()); + + setupProcessor(false); + doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp); + doThrow(new IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any()); + processor.process(event); + + ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get()); + assertInstanceOf(IllegalStateException.class, e.getCause()); + } + private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); }