From f45b70e6886eef97ca5d0a838ef39231d354ffc0 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Mon, 29 Sep 2025 12:26:11 -0700 Subject: [PATCH] Refactoring --- .../events/ApplicationEventProcessor.java | 122 +++++++++--------- .../CompositePollEventProcessorContext.java | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 1 - 3 files changed, 65 insertions(+), 60 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index db93b82c767..6314f762b14 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -226,10 +226,35 @@ public class ApplicationEventProcessor implements EventProcessor + consumerMembershipManager.maybeReconcile(true)); + if (requestManagers.commitRequestManager.isPresent()) { + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); + // all commit request generation points have been passed, + // so it's safe to notify the app thread could proceed and start fetching + event.markReconcileAndAutoCommitComplete(); + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + } else { + // safe to unblock - no auto-commit risk here: + // 1. commitRequestManager is not present + // 2. shareConsumer has no auto-commit mechanism + event.markReconcileAndAutoCommitComplete(); + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + } } private void process(final CreateFetchRequestsEvent event) { @@ -393,7 +418,12 @@ public class ApplicationEventProcessor implements EventProcessor future = processCheckAndUpdatePositionsEvent(event.deadlineMs()); + CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); future.whenComplete(complete(event.future())); } @@ -714,16 +744,34 @@ public class ApplicationEventProcessor implements EventProcessor + consumerMembershipManager.maybeReconcile(true)); + if (requestManagers.commitRequestManager.isPresent()) { + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs); + // all commit request generation points have been passed, + // so it's safe to notify the app thread could proceed and start fetching + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(pollTimeMs); + }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(pollTimeMs); + }); + } else { + // safe to unblock - no auto-commit risk here: + // 1. commitRequestManager is not present + // 2. shareConsumer has no auto-commit mechanism + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(pollTimeMs); + }); + } nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS; if (context.maybeFailCompositePoll(event) || context.maybePauseCompositePoll(event, nextEventType)) @@ -732,8 +780,8 @@ public class ApplicationEventProcessor implements EventProcessor updatePositionsFuture = processCheckAndUpdatePositionsEvent(event.deadlineMs()); - context.trackExpirableEvent(updatePositionsFuture, event.deadlineMs()); + CompletableFuture updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); + context.trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs()); updatePositionsFuture.whenComplete((__, updatePositionsError) -> { if (context.maybeFailCompositePoll(event, updatePositionsError)) @@ -837,46 +885,4 @@ public class ApplicationEventProcessor implements EventProcessor - consumerMembershipManager.maybeReconcile(true)); - if (requestManagers.commitRequestManager.isPresent()) { - CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs); - // all commit request generation points have been passed, - // so it's safe to notify the app thread could proceed and start fetching - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - } else { - // safe to unblock - no auto-commit risk here: - // 1. commitRequestManager is not present - // 2. shareConsumer has no auto-commit mechanism - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - } - } - - private void processUpdatePatternSubscriptionEvent() { - if (subscriptions.hasPatternSubscription()) { - if (this.metadataVersionSnapshot < metadata.updateVersion()) { - this.metadataVersionSnapshot = metadata.updateVersion(); - updatePatternSubscription(metadata.fetch()); - } - } - } - - private CompletableFuture processCheckAndUpdatePositionsEvent(final long deadlineMs) { - return requestManagers.offsetsRequestManager.updateFetchPositions(deadlineMs); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java index 7ce050a7bbc..646bca4e8e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java @@ -71,7 +71,7 @@ public class CompositePollEventProcessorContext { }; }; - public void trackExpirableEvent(CompletableFuture future, long deadlineMs) { + public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture future, long deadlineMs) { CompletableEvent event = new CompletableEvent<>() { @Override public CompletableFuture future() { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 631e9f8be46..b7d33d08857 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -568,7 +568,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { consumer.assign(util.Set.of(tp)) consumer.seekToBeginning(util.Set.of(tp)) - TestUtils.waitUntilTrue(() => { val records = consumer.poll(time.Duration.ofSeconds(3)) expectedNumber == records.count()