MINOR: retry upon missing source topic (#20284)

Implements a timeout mechanism (using maxPollTimeMs) that waits for
missing source topics to be created before failing, instead of
immediately  throwing exceptions in the new Streams protocol.
Additionally, throw  TopologyException when partition count mismatch is
detected.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Alieh Saeedi
 <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Jinhe Zhang 2025-08-06 18:32:48 -04:00 committed by GitHub
parent 1b588afb96
commit 03190e4c22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 278 additions and 47 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -50,7 +51,8 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@ -59,6 +61,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@ -71,6 +74,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
@ -121,7 +125,7 @@ public class KStreamRepartitionIntegrationTest {
CLUSTER.createTopic(outputTopic, 1, 1);
}
private Properties createStreamsConfig(final String topologyOptimization) {
private Properties createStreamsConfig(final String topologyOptimization, final boolean useNewProtocol) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@ -131,9 +135,23 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
return streamsConfiguration;
}
private static Stream<Arguments> protocolAndOptimizationParameters() {
return Stream.of(
Arguments.of(StreamsConfig.OPTIMIZE, false), // OPTIMIZE with CLASSIC protocol
Arguments.of(StreamsConfig.OPTIMIZE, true), // OPTIMIZE with STREAMS protocol
Arguments.of(StreamsConfig.NO_OPTIMIZATION, false), // NO_OPTIMIZATION with CLASSIC protocol
Arguments.of(StreamsConfig.NO_OPTIMIZATION, true) // NO_OPTIMIZATION with STREAMS protocol
);
}
@AfterEach
public void whenShuttingDown() throws IOException {
kafkaStreamsInstances.stream()
@ -144,8 +162,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining(final String topologyOptimization) throws InterruptedException {
@MethodSource("protocolAndOptimizationParameters")
public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining(final String topologyOptimization, final boolean useNewProtocol) throws InterruptedException {
final int topicBNumberOfPartitions = 6;
final String inputTopicRepartitionName = "join-repartition-test";
final AtomicReference<Throwable> expectedThrowable = new AtomicReference<>();
@ -167,10 +185,12 @@ public class KStreamRepartitionIntegrationTest {
.join(topicBStream, (value1, value2) -> value2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10)))
.to(outputTopic);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
try (final KafkaStreams ks = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration)) {
ks.setUncaughtExceptionHandler(exception -> {
expectedThrowable.set(exception);
System.out.println(String.format("[%s Protocol] Exception caught: %s",
useNewProtocol ? "STREAMS" : "CLASSIC", exception.getMessage()));
return SHUTDOWN_CLIENT;
});
ks.start();
@ -186,8 +206,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String topicBMapperName = "topic-b-mapper";
final int topicBNumberOfPartitions = 6;
final String inputTopicRepartitionName = "join-repartition-test";
@ -220,7 +240,7 @@ public class KStreamRepartitionIntegrationTest {
.join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
.to(outputTopic);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
builder.build(streamsConfiguration);
startStreams(builder, streamsConfiguration);
@ -239,8 +259,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String topicBRepartitionedName = "topic-b-scale-up";
final String inputTopicRepartitionedName = "input-topic-scale-up";
@ -278,7 +298,7 @@ public class KStreamRepartitionIntegrationTest {
.join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
assertEquals(4, getNumberOfPartitionsForTopic(toRepartitionTopicName(topicBRepartitionedName)));
assertEquals(4, getNumberOfPartitionsForTopic(toRepartitionTopicName(inputTopicRepartitionedName)));
@ -291,8 +311,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldRepartitionToMultiplePartitions(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldRepartitionToMultiplePartitions(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "broadcasting-partitioner-test";
final long timestamp = System.currentTimeMillis();
final AtomicInteger partitionerInvocation = new AtomicInteger(0);
@ -334,7 +354,7 @@ public class KStreamRepartitionIntegrationTest {
.repartition(repartitioned)
.to(broadcastingOutputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
final String topic = toRepartitionTopicName(repartitionName);
@ -360,8 +380,8 @@ public class KStreamRepartitionIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldUseStreamPartitionerForRepartitionOperation(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldUseStreamPartitionerForRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final int partition = 1;
final String repartitionName = "partitioner-test";
final long timestamp = System.currentTimeMillis();
@ -387,7 +407,7 @@ public class KStreamRepartitionIntegrationTest {
.repartition(repartitioned)
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
final String topic = toRepartitionTopicName(repartitionName);
@ -402,8 +422,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldPerformSelectKeyWithRepartitionOperation(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldPerformSelectKeyWithRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final long timestamp = System.currentTimeMillis();
sendEvents(
@ -421,7 +441,7 @@ public class KStreamRepartitionIntegrationTest {
.repartition()
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new IntegerDeserializer(),
@ -438,8 +458,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "dummy";
final long timestamp = System.currentTimeMillis();
@ -457,7 +477,7 @@ public class KStreamRepartitionIntegrationTest {
.repartition(Repartitioned.as(repartitionName))
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new IntegerDeserializer(),
@ -475,8 +495,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionedName = "new-key";
final long timestamp = System.currentTimeMillis();
@ -501,7 +521,7 @@ public class KStreamRepartitionIntegrationTest {
.toStream()
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new StringDeserializer(),
@ -521,8 +541,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "new-partitions";
final long timestamp = System.currentTimeMillis();
@ -543,7 +563,7 @@ public class KStreamRepartitionIntegrationTest {
.toStream()
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new IntegerDeserializer(),
@ -561,8 +581,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "new-topic";
final long timestamp = System.currentTimeMillis();
@ -583,7 +603,7 @@ public class KStreamRepartitionIntegrationTest {
.toStream()
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new IntegerDeserializer(),
@ -601,8 +621,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "new-partitions";
final long timestamp = System.currentTimeMillis();
@ -629,7 +649,7 @@ public class KStreamRepartitionIntegrationTest {
.toStream()
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
final String topology = builder.build().describe().toString();
@ -647,8 +667,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final long timestamp = System.currentTimeMillis();
sendEvents(
@ -666,7 +686,7 @@ public class KStreamRepartitionIntegrationTest {
.repartition(Repartitioned.with(Serdes.String(), Serdes.String()))
.to(outputTopic);
startStreams(builder, createStreamsConfig(topologyOptimization));
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
validateReceivedMessages(
new StringDeserializer(),
@ -683,8 +703,8 @@ public class KStreamRepartitionIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
public void shouldGoThroughRebalancingCorrectly(final String topologyOptimization) throws Exception {
@MethodSource("protocolAndOptimizationParameters")
public void shouldGoThroughRebalancingCorrectly(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
final String repartitionName = "rebalancing-test";
final long timestamp = System.currentTimeMillis();
@ -711,7 +731,7 @@ public class KStreamRepartitionIntegrationTest {
.toStream()
.to(outputTopic);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
startStreams(builder, streamsConfiguration);
final Properties streamsToCloseConfigs = new Properties();
streamsToCloseConfigs.putAll(streamsConfiguration);

View File

@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaClientSupplier;
@ -56,6 +57,7 @@ import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ConsumerWrapper;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
@ -372,6 +374,8 @@ public class StreamThread extends Thread implements ProcessingThread {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>();
private Timer topicsReadyTimer;
public static StreamThread create(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@ -1536,16 +1540,29 @@ public class StreamThread extends Thread implements ProcessingThread {
public void handleStreamsRebalanceData() {
if (streamsRebalanceData.isPresent()) {
boolean hasMissingSourceTopics = false;
String missingTopicsDetail = null;
for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) {
if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
} else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
final String errorMsg = String.format("Missing source topics: %s", status.statusDetail());
hasMissingSourceTopics = true;
missingTopicsDetail = status.statusDetail();
} else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) {
final String errorMsg = status.statusDetail();
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
throw new TopologyException(errorMsg);
}
}
if (hasMissingSourceTopics) {
handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
} else {
// Reset timeout tracking when no missing source topics are reported
topicsReadyTimer = null;
}
final Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> partitionsByEndpoint =
streamsRebalanceData.get().partitionsByHost();
final Map<HostInfo, Set<TopicPartition>> activeHostInfoMap = new HashMap<>();
@ -1563,6 +1580,30 @@ public class StreamThread extends Thread implements ProcessingThread {
}
}
private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) {
// Start timeout tracking on first encounter with missing topics
if (topicsReadyTimer == null) {
topicsReadyTimer = time.timer(maxPollTimeMs);
log.info("Missing source topics detected: {}. Will wait up to {}ms before failing.",
missingTopicsDetail, maxPollTimeMs);
} else {
topicsReadyTimer.update();
}
if (topicsReadyTimer.isExpired()) {
final long elapsedTime = topicsReadyTimer.elapsedMs();
final String errorMsg = String.format("Missing source topics: %s. Timeout exceeded after %dms.",
missingTopicsDetail, elapsedTime);
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
} else {
log.debug("Missing source topics: {}. Elapsed time: {}ms, timeout in: {}ms",
missingTopicsDetail, topicsReadyTimer.elapsedMs(), topicsReadyTimer.remainingMs());
}
}
static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
for (final Set<TopicPartition> value : partitionsByHost.values()) {

View File

@ -70,6 +70,7 @@ import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@ -162,6 +163,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -3896,8 +3898,9 @@ public class StreamThreadTest {
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final StreamsConfig config = new StreamsConfig(props);
final MockTime mockTime = new MockTime(1);
thread = new StreamThread(
new MockTime(1),
mockTime,
config,
null,
mainConsumer,
@ -3929,8 +3932,77 @@ public class StreamThreadTest {
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().startsWith("Missing source topics"));
assertTrue(exception.getMessage().contains("Missing source topics"));
assertTrue(exception.getMessage().contains("Timeout exceeded"));
}
@Test
public void testStreamsProtocolIncorrectlyPartitionedTopics() {
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new ConsumerRecords<>(Map.of(), Map.of()));
when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
Map.of(),
Map.of()
);
final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final StreamsConfig config = new StreamsConfig(props);
final MockTime mockTime = new MockTime(1);
thread = new StreamThread(
mockTime,
config,
null,
mainConsumer,
consumer,
changelogReader,
null,
mock(TaskManager.class),
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code())
.setStatusDetail("Topics are incorrectly partitioned")
));
// Should immediately throw TopologyException (no timeout like MISSING_SOURCE_TOPICS)
final TopologyException exception = assertThrows(TopologyException.class,
() -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().contains("Topics are incorrectly partitioned"));
}
@Test
@ -4013,8 +4085,9 @@ public class StreamThreadTest {
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final MockTime mockTime = new MockTime(1);
thread = new StreamThread(
new MockTime(1),
mockTime,
config,
null,
mainConsumer,
@ -4046,8 +4119,105 @@ public class StreamThreadTest {
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithProcessingThreads();
// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithProcessingThreads());
assertTrue(exception.getMessage().startsWith("Missing source topics"));
assertTrue(exception.getMessage().contains("Missing source topics"));
assertTrue(exception.getMessage().contains("Timeout exceeded"));
}
@Test
public void testStreamsProtocolMissingSourceTopicRecovery() {
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new ConsumerRecords<>(Map.of(), Map.of()));
when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
Map.of(),
Map.of()
);
final Properties props = configProps(false, false, false);
final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, config),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final MockTime mockTime = new MockTime(1);
thread = new StreamThread(
mockTime,
config,
null,
mainConsumer,
consumer,
changelogReader,
null,
mock(TaskManager.class),
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
thread.runOnceWithoutProcessingThreads();
// Set missing source topics status
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time but not beyond timeout
mockTime.sleep(150000); // Half of max.poll.interval.ms
// Should still not throw exception
thread.runOnceWithoutProcessingThreads();
// Clear the missing source topics (simulate recovery)
streamsRebalanceData.setStatuses(List.of());
// Should complete without exception (recovery successful)
assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
// Set missing topics again - should reset the timeout
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Different missing topics")
));
// Advance time by 250 seconds to test if timer was reset
// Total time from beginning: 150000 + 250000 = 400000ms (400s)
// If timer was NOT reset: elapsed time = 400s > 300s should throw
// If timer WAS reset: elapsed time = 250s < 300s should NOT throw
mockTime.sleep(250000); // Advance by 250 seconds
// Should not throw because timer was reset - only 250s elapsed from reset point
assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
}
@Test