MINOR: update flaky CustomHandlerIntegrationTest (#16710)

This PR reduces the MAX_BLOCK_MS config which defaults to 60sec to
10sec, to avoid a race condition with the 60sec test timeout.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-07-30 08:13:59 -07:00 committed by GitHub
parent 901c656a1d
commit 3c580e25bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 35 additions and 22 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.IntegerSerializer;
@ -58,7 +59,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@Tag("integration")
public class CustomHandlerIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")));
@ -72,26 +72,25 @@ public class CustomHandlerIntegrationTest {
CLUSTER.stop();
}
private final long timeout = 60000;
private final long timeoutMs = 60_000;
// topic name
private static final String STREAM_INPUT = "STREAM_INPUT";
private static final String NON_EXISTING_TOPIC = "non_existing_topic";
private final AtomicReference<Throwable> caughtException = new AtomicReference<>();
private KafkaStreams kafkaStreams;
AtomicReference<Throwable> caughtException;
Topology topology;
private Topology topology;
private String appId;
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT);
caughtException = new AtomicReference<>();
final String safeTestName = safeUniqueTestName(testInfo);
appId = "app-" + safeTestName;
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
.to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
produceRecords();
@ -127,7 +126,7 @@ public class CustomHandlerIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
streamsConfiguration.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10_000);
return streamsConfiguration;
}
@ -148,19 +147,33 @@ public class CustomHandlerIntegrationTest {
kafkaStreams.start();
TestUtils.waitForCondition(
() -> kafkaStreams.state() == State.RUNNING,
timeout,
() -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
while (true) {
if (caughtException.get() != null) {
final Throwable throwable = caughtException.get();
assertInstanceOf(StreamsException.class, throwable);
assertInstanceOf(TimeoutException.class, throwable.getCause());
assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause().getCause());
timeoutMs,
() -> "Kafka Streams application did not reach state RUNNING in " + timeoutMs + " ms"
);
TestUtils.waitForCondition(
this::receivedUnknownTopicOrPartitionException,
timeoutMs,
() -> "Did not receive UnknownTopicOrPartitionException"
);
TestUtils.waitForCondition(
() -> kafkaStreams.state() == State.ERROR,
timeoutMs,
() -> "Kafka Streams application did not reach state ERROR in " + timeoutMs + " ms"
);
closeApplication(streamsConfiguration);
break;
} else {
Thread.sleep(100);
}
}
}
private boolean receivedUnknownTopicOrPartitionException() {
if (caughtException.get() == null) {
return false;
}
assertInstanceOf(StreamsException.class, caughtException.get());
assertInstanceOf(TimeoutException.class, caughtException.get().getCause());
assertInstanceOf(UnknownTopicOrPartitionException.class, caughtException.get().getCause().getCause());
return true;
}
}