MINOR: Throw exceptions if source topic is missing (#20123)
CI / build (push) Waiting to run Details

In the old protocol, Kafka Streams used to throw a
`MissingSourceTopicException` when a source topic is missing. In the new
protocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.

This change:
1. Throws an `MissingSourceTopicException` when source topic is missing
2. Adds unit tests
3. Modifies integration tests to fit both old and new protocols

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Jinhe Zhang 2025-07-09 15:19:12 -04:00 committed by GitHub
parent 7b8a594a22
commit c625b44d8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 144 additions and 6 deletions

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
@ -33,10 +34,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@ -75,8 +78,9 @@ public class HandlingSourceTopicDeletionIntegrationTest {
CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
}
@Test
public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testName) throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldThrowErrorAfterSourceTopicDeleted(final boolean useNewProtocol, final TestInfo testName) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
@ -91,6 +95,10 @@ public class HandlingSourceTopicDeletionIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
if (useNewProtocol) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
final Topology topology = builder.build();
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsConfiguration);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@ -32,10 +33,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -87,10 +90,15 @@ public class JoinWithIncompleteMetadataIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
@Test
public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final boolean useNewProtocol) throws InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
if (useNewProtocol) {
STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
final KStream<Long, String> notExistStream = builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);

View File

@ -52,6 +52,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@ -1536,6 +1537,10 @@ public class StreamThread extends Thread implements ProcessingThread {
for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) {
if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
} else if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
final String errorMsg = String.format("Missing source topics: %s", status.statusDetail());
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
}
}

View File

@ -66,6 +66,7 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@ -3875,6 +3876,64 @@ public class StreamThreadTest {
verify(shutdownErrorHook).run();
}
@Test
public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic() {
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new ConsumerRecords<>(Map.of(), Map.of()));
when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
Map.of(),
Map.of()
);
final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final StreamsConfig config = new StreamsConfig(props);
thread = new StreamThread(
new MockTime(1),
config,
null,
mainConsumer,
consumer,
changelogReader,
null,
mock(TaskManager.class),
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
thread.runOnceWithoutProcessingThreads();
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().startsWith("Missing source topics"));
}
@Test
public void testStreamsProtocolRunOnceWithProcessingThreads() {
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
@ -3934,6 +3993,64 @@ public class StreamThreadTest {
verify(shutdownErrorHook).run();
}
@Test
public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() {
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new ConsumerRecords<>(Map.of(), Map.of()));
when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
Map.of(),
Map.of()
);
final Properties props = configProps(false, false, false);
final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, config),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
thread = new StreamThread(
new MockTime(1),
config,
null,
mainConsumer,
consumer,
changelogReader,
null,
mock(TaskManager.class),
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
thread.runOnceWithProcessingThreads();
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().startsWith("Missing source topics"));
}
@Test
public void testGetTopicPartitionInfo() {
assertEquals(