Refactoring

This commit is contained in:
Kirk True 2025-09-29 12:26:11 -07:00
parent bfcd7ec0f8
commit f45b70e688
3 changed files with 65 additions and 60 deletions

View File

@ -226,10 +226,35 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
} }
} }
private void process(final PollEvent event) { private void process(final PollEvent event) {
processPollEvent(event.pollTimeMs()); // Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts in the app thread
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
event.markReconcileAndAutoCommitComplete(); event.markReconcileAndAutoCommitComplete();
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
event.markReconcileAndAutoCommitComplete();
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
} }
private void process(final CreateFetchRequestsEvent event) { private void process(final CreateFetchRequestsEvent event) {
@ -393,7 +418,12 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next poll. * This will make the consumer send the updated subscription on the next poll.
*/ */
private void process(final UpdatePatternSubscriptionEvent event) { private void process(final UpdatePatternSubscriptionEvent event) {
processUpdatePatternSubscriptionEvent(); if (subscriptions.hasPatternSubscription()) {
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
}
event.future().complete(null); event.future().complete(null);
} }
@ -435,7 +465,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* them to update positions in the subscription state. * them to update positions in the subscription state.
*/ */
private void process(final CheckAndUpdatePositionsEvent event) { private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future = processCheckAndUpdatePositionsEvent(event.deadlineMs()); CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future())); future.whenComplete(complete(event.future()));
} }
@ -714,16 +744,34 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (nextEventType == ApplicationEvent.Type.POLL) { if (nextEventType == ApplicationEvent.Type.POLL) {
log.debug("Processing {} logic for {}", nextEventType, event); log.debug("Processing {} logic for {}", nextEventType, event);
processPollEvent(event.pollTimeMs()); long pollTimeMs = event.pollTimeMs();
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
if (context.maybeFailCompositePoll(event) || context.maybePauseCompositePoll(event, nextEventType)) // Trigger a reconciliation that can safely commit offsets if needed to rebalance,
return; // as we're processing before any new fetching starts in the app thread
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs);
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
} }
if (nextEventType == ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA) {
log.debug("Processing {} logic for {}", nextEventType, event);
processUpdatePatternSubscriptionEvent();
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS; nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
if (context.maybeFailCompositePoll(event) || context.maybePauseCompositePoll(event, nextEventType)) if (context.maybeFailCompositePoll(event) || context.maybePauseCompositePoll(event, nextEventType))
@ -732,8 +780,8 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (nextEventType == ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS) { if (nextEventType == ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS) {
log.debug("Processing {} logic for {}", nextEventType, event); log.debug("Processing {} logic for {}", nextEventType, event);
CompletableFuture<Boolean> updatePositionsFuture = processCheckAndUpdatePositionsEvent(event.deadlineMs()); CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
context.trackExpirableEvent(updatePositionsFuture, event.deadlineMs()); context.trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs());
updatePositionsFuture.whenComplete((__, updatePositionsError) -> { updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (context.maybeFailCompositePoll(event, updatePositionsError)) if (context.maybeFailCompositePoll(event, updatePositionsError))
@ -837,46 +885,4 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
int metadataVersionSnapshot() { int metadataVersionSnapshot() {
return metadataVersionSnapshot; return metadataVersionSnapshot;
} }
private void processPollEvent(final long pollTimeMs) {
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts in the app thread
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs);
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(pollTimeMs);
});
}
}
private void processUpdatePatternSubscriptionEvent() {
if (subscriptions.hasPatternSubscription()) {
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
}
}
private CompletableFuture<Boolean> processCheckAndUpdatePositionsEvent(final long deadlineMs) {
return requestManagers.offsetsRequestManager.updateFetchPositions(deadlineMs);
}
} }

View File

@ -71,7 +71,7 @@ public class CompositePollEventProcessorContext {
}; };
}; };
public <T> void trackExpirableEvent(CompletableFuture<T> future, long deadlineMs) { public <T> void trackCheckAndUpdatePositionsForTimeout(CompletableFuture<T> future, long deadlineMs) {
CompletableEvent<T> event = new CompletableEvent<>() { CompletableEvent<T> event = new CompletableEvent<>() {
@Override @Override
public CompletableFuture<T> future() { public CompletableFuture<T> future() {

View File

@ -568,7 +568,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try { try {
consumer.assign(util.Set.of(tp)) consumer.assign(util.Set.of(tp))
consumer.seekToBeginning(util.Set.of(tp)) consumer.seekToBeginning(util.Set.of(tp))
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val records = consumer.poll(time.Duration.ofSeconds(3)) val records = consumer.poll(time.Duration.ofSeconds(3))
expectedNumber == records.count() expectedNumber == records.count()