KAFKA-17448: New consumer seek should update positions in background thread (#17075)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>
This commit is contained in:
PoAn Yang 2024-09-12 02:08:33 +08:00 committed by GitHub
parent 0c4ffc682c
commit 3f4c25fe1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 147 additions and 13 deletions

View File

@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@ -56,6 +55,7 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
@ -790,11 +790,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
log.info("Seeking to offset {} for partition {}", offset, partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offset,
Optional.empty(), // This will ensure we skip validation
metadata.currentLeader(partition));
subscriptions.seekUnvalidated(partition, newPosition);
Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer), partition, offset, Optional.empty());
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}
@ -815,13 +814,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
} else {
log.info("Seeking to offset {} for partition {}", offset, partition);
}
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
currentLeaderAndEpoch);
updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
subscriptions.seekUnvalidated(partition, newPosition);
Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer), partition, offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}

View File

@ -35,7 +35,8 @@ public abstract class ApplicationEvent {
COMMIT_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE
SHARE_ACKNOWLEDGE_ON_CLOSE,
SEEK_UNVALIDATED,
}
private final Type type;

View File

@ -147,6 +147,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
process((ShareAcknowledgeOnCloseEvent) event);
return;
case SEEK_UNVALIDATED:
process((SeekUnvalidatedEvent) event);
return;
default:
log.warn("Application event type {} was not expected", event.type());
}
@ -409,4 +413,18 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
};
}
private void process(final SeekUnvalidatedEvent event) {
try {
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
metadata.currentLeader(event.partition())
);
subscriptions.seekUnvalidated(event.partition(), newPosition);
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
/**
* Event to perform {@link SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> {
private final TopicPartition partition;
private final long offset;
private final Optional<Integer> offsetEpoch;
public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition, long offset, Optional<Integer> offsetEpoch) {
super(Type.SEEK_UNVALIDATED, deadlineMs);
this.partition = partition;
this.offset = offset;
this.offsetEpoch = offsetEpoch;
}
public TopicPartition partition() {
return partition;
}
public long offset() {
return offset;
}
public Optional<Integer> offsetEpoch() {
return offsetEpoch;
}
@Override
protected String toStringBase() {
return super.toStringBase()
+ ", partition=" + partition
+ ", offset=" + offset
+ offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse("");
}
}

View File

@ -45,6 +45,7 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
@ -769,6 +770,7 @@ public class AsyncKafkaConsumerTest {
new Node(1, "host", 9000)), Optional.of(1)));
completeAssignmentChangeEventSuccessfully();
consumer.assign(Arrays.asList(t0, t1));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(t0, 10);
consumer.seek(t1, 20);
@ -804,6 +806,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@ -827,6 +830,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@ -894,6 +898,7 @@ public class AsyncKafkaConsumerTest {
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
consumer.commitAsync();
@ -925,6 +930,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@ -1018,6 +1024,7 @@ public class AsyncKafkaConsumerTest {
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
@ -1035,6 +1042,7 @@ public class AsyncKafkaConsumerTest {
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class));
}
@ -1293,6 +1301,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 10);
consumer.wakeup();
@ -1322,6 +1331,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
consumer.commitAsync();
@ -1340,6 +1350,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
completeCommitAsyncApplicationEventSuccessfully();
consumer.commitAsync(cb);
@ -2187,6 +2198,20 @@ public class AsyncKafkaConsumerTest {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
}
private void completeSeekUnvalidatedEventSuccessfully() {
doAnswer(invocation -> {
SeekUnvalidatedEvent event = invocation.getArgument(0);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
metadata.currentLeader(event.partition())
);
consumer.subscriptions().seekUnvalidated(event.partition(), newPosition);
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
}
private void forceCommitCallbackInvocation() {
// Invokes callback
consumer.commitAsync();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
@ -53,6 +54,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -183,6 +186,36 @@ public class ApplicationEventProcessorTest {
assertInstanceOf(IllegalStateException.class, e.getCause());
}
@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
0, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch());
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty());
setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);
verify(metadata).currentLeader(tp);
verify(subscriptionState).seekUnvalidated(tp, position);
assertDoesNotThrow(() -> event.future().get());
}
@Test
public void testSeekUnvalidatedEventWithException() {
TopicPartition tp = new TopicPartition("topic", 0);
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty());
setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doThrow(new IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);
ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get());
assertInstanceOf(IllegalStateException.class, e.getCause());
}
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}