MINOR: Enable streams rebalance protocol in EosIntegrationTest (#20592)
CI / build (push) Waiting to run Details

Remove stalling instance in EOSIntegrationTest, since it doesn’t matter
what it thinks what the assignment is but blocks the test with streams
group protocol

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Jinhe Zhang 2025-10-05 09:55:59 -04:00 committed by GitHub
parent d76442e5a6
commit 611f4128b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 77 additions and 51 deletions

View File

@ -72,9 +72,10 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -169,6 +170,15 @@ public class EosIntegrationTest {
private String stateTmpDir;
private static java.util.stream.Stream<Arguments> groupProtocolAndProcessingThreadsParameters() {
return java.util.stream.Stream.of(
Arguments.of("classic", true),
Arguments.of("classic", false),
Arguments.of("streams", true),
Arguments.of("streams", false)
);
}
@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
@ -181,16 +191,19 @@ public class EosIntegrationTest {
CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
CLUSTER.setGroupStandbyReplicas(applicationId, 1);
}
@Test
public void shouldBeAbleToRunWithEosEnabled() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToRunWithEosEnabled(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
@Test
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, groupProtocol);
try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())));
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(mkMap(
@ -215,36 +228,42 @@ public class EosIntegrationTest {
}
}
@Test
public void shouldBeAbleToRestartAfterClose() throws Exception {
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToRestartAfterClose(final String groupProtocol) throws Exception {
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
@Test
public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToCommitToMultiplePartitions(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
@Test
public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToCommitMultiplePartitionOffsets(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
@Test
public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToRunWithTwoSubtopologies(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
@Test
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false);
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String groupProtocol) throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol);
}
private void runSimpleCopyTest(final int numberOfRestarts,
final String inputTopic,
final String throughTopic,
final String outputTopic,
final boolean inputTopicTransactional) throws Exception {
final boolean inputTopicTransactional,
final String groupProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input;
@ -263,6 +282,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_INTERVAL_MS - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
for (int i = 0; i < numberOfRestarts; ++i) {
final Properties config = StreamsTestUtils.getStreamsConfig(
@ -326,8 +346,9 @@ public class EosIntegrationTest {
return recordsPerKey;
}
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
@ -337,6 +358,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,
@ -374,8 +396,8 @@ public class EosIntegrationTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
@ -386,7 +408,7 @@ public class EosIntegrationTest {
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50 record got written)
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, processingThreadsEnabled)) {
try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@ -476,8 +498,8 @@ public class EosIntegrationTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
@ -493,7 +515,7 @@ public class EosIntegrationTest {
// We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
// to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, processingThreadsEnabled)) {
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@ -594,8 +616,8 @@ public class EosIntegrationTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
//
@ -607,10 +629,9 @@ public class EosIntegrationTest {
//
// afterward, the "stalling" thread resumes, and another rebalance should get triggered
// we write the remaining 20 records and verify to read 60 result records
try (
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, processingThreadsEnabled);
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, processingThreadsEnabled)
final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, groupProtocol, processingThreadsEnabled);
final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, groupProtocol, processingThreadsEnabled)
) {
startApplicationAndWaitUntilRunning(streams1);
startApplicationAndWaitUntilRunning(streams2);
@ -667,13 +688,10 @@ public class EosIntegrationTest {
"Expected a host to start stalling"
);
final String observedStallingHost = stallingHost.get();
final KafkaStreams stallingInstance;
final KafkaStreams remainingInstance;
if ("streams1".equals(observedStallingHost)) {
stallingInstance = streams1;
remainingInstance = streams2;
} else if ("streams2".equals(observedStallingHost)) {
stallingInstance = streams2;
remainingInstance = streams1;
} else {
throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
@ -683,8 +701,7 @@ public class EosIntegrationTest {
// the assignment is. We only really care that the remaining instance only sees one host
// that owns both partitions.
waitForCondition(
() -> stallingInstance.metadataForAllStreamsClients().size() == 2
&& remainingInstance.metadataForAllStreamsClients().size() == 1
() -> remainingInstance.metadataForAllStreamsClients().size() == 1
&& remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
MAX_WAIT_TIME_MS,
() -> "Should have rebalanced.\n" +
@ -755,12 +772,12 @@ public class EosIntegrationTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, processingThreadsEnabled)) {
try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) {
writeInputData(writtenData);
startApplicationAndWaitUntilRunning(streams);
@ -787,9 +804,9 @@ public class EosIntegrationTest {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
final boolean processingThreadsEnabled) throws Exception {
final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@ -801,6 +818,7 @@ public class EosIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
final String stateStoreName = "stateStore";
@ -934,8 +952,13 @@ public class EosIntegrationTest {
static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>();
static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
@Test
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"classic", "streams"})
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String groupProtocol) throws Exception {
// Reset static variables to ensure test isolation
TASK_WITH_DATA.set(null);
DID_REVOKE_IDLE_TASK.set(false);
final AtomicBoolean requestCommit = new AtomicBoolean(false);
final StreamsBuilder builder = new StreamsBuilder();
@ -970,6 +993,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), Integer.MAX_VALUE);
properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TestTaskAssignor.class.getName());
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,
@ -1003,9 +1027,9 @@ public class EosIntegrationTest {
// add second thread, to trigger rebalance
// expect idle task to get revoked -- this should not trigger a TX commit
streams.addStreamThread();
waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
if (groupProtocol.equals("classic")) {
waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected.");
}
// best-effort sanity check (might pass and not detect issue in slow environments)
try {
readResult(SINGLE_PARTITION_OUTPUT_TOPIC, 1, "consumer", 10_000L);
@ -1104,6 +1128,7 @@ public class EosIntegrationTest {
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
final String groupProtocol,
final boolean processingThreadsEnabled) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
@ -1212,6 +1237,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,