KAFKA-19271: allow intercepting internal method call (#19832)
CI / build (push) Waiting to run Details

To allow intercepting the internal subscribe call to the async-consumer,
we need to extend ConsumerWrapper interface accordingly, instead of
returning the wrapped async-consumer back to the KS runtime.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-06-09 07:28:13 -07:00 committed by GitHub
parent 948a91dfdf
commit 0adc6fa3e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 50 deletions

View File

@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
@ -54,10 +55,6 @@ public abstract class ConsumerWrapper implements Consumer<byte[], byte[]> {
this.delegate = delegate;
}
public AsyncKafkaConsumer<byte[], byte[]> consumer() {
return delegate;
}
@Override
public Set<TopicPartition> assignment() {
return delegate.assignment();
@ -78,6 +75,10 @@ public abstract class ConsumerWrapper implements Consumer<byte[], byte[]> {
delegate.subscribe(topics, callback);
}
public void subscribe(final Collection<String> topics, final StreamsRebalanceListener streamsRebalanceListener) {
delegate.subscribe(topics, streamsRebalanceListener);
}
@Override
public void assign(final Collection<TopicPartition> partitions) {
delegate.assign(partitions);

View File

@ -1138,19 +1138,29 @@ public class StreamThread extends Thread implements ProcessingThread {
mainConsumer.subscribe(topologyMetadata.sourceTopicPattern(), rebalanceListener);
} else {
if (streamsRebalanceData.isPresent()) {
final AsyncKafkaConsumer<byte[], byte[]> consumer = mainConsumer instanceof ConsumerWrapper
? ((ConsumerWrapper) mainConsumer).consumer()
: (AsyncKafkaConsumer<byte[], byte[]>) mainConsumer;
consumer.subscribe(
topologyMetadata.allFullSourceTopicNames(),
new DefaultStreamsRebalanceListener(
log,
time,
streamsRebalanceData.get(),
this,
taskManager
)
);
if (mainConsumer instanceof ConsumerWrapper) {
((ConsumerWrapper) mainConsumer).subscribe(
topologyMetadata.allFullSourceTopicNames(),
new DefaultStreamsRebalanceListener(
log,
time,
streamsRebalanceData.get(),
this,
taskManager
)
);
} else {
((AsyncKafkaConsumer<byte[], byte[]>) mainConsumer).subscribe(
topologyMetadata.allFullSourceTopicNames(),
new DefaultStreamsRebalanceListener(
log,
time,
streamsRebalanceData.get(),
this,
taskManager
)
);
}
} else {
mainConsumer.subscribe(topologyMetadata.allFullSourceTopicNames(), rebalanceListener);
}

View File

@ -69,7 +69,6 @@ import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.ConsumerWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@ -3952,38 +3951,6 @@ public class StreamThreadTest {
);
}
@ParameterizedTest
@MethodSource("data")
public void shouldWrapMainConsumerFromClassConfig(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestWrapper.class);
thread = createStreamThread("clientId", new StreamsConfig(streamsConfigProps));
assertInstanceOf(
AsyncKafkaConsumer.class,
assertInstanceOf(TestWrapper.class, thread.mainConsumer()).consumer()
);
}
@ParameterizedTest
@MethodSource("data")
public void shouldWrapMainConsumerFromStringConfig(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
streamsConfigProps.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
streamsConfigProps.put(InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestWrapper.class.getName());
thread = createStreamThread("clientId", new StreamsConfig(streamsConfigProps));
assertInstanceOf(
AsyncKafkaConsumer.class,
assertInstanceOf(TestWrapper.class, thread.mainConsumer()).consumer()
);
}
public static final class TestWrapper extends ConsumerWrapper { }
private StreamThread setUpThread(final Properties streamsConfigProps) {
final StreamsConfig config = new StreamsConfig(streamsConfigProps);
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);