Compare commits

...

9 Commits

Author SHA1 Message Date
Shashank cca1bc49a2
Merge ae54bc2d69 into 4a5aa37169 2025-10-07 17:24:13 +00:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
Shashank Hosahalli Shivamurthy ae54bc2d69 add perf test 2025-10-06 19:06:10 -07:00
Shashank Hosahalli Shivamurthy 4762a9975d remove useless tests 2025-10-06 18:43:22 -07:00
Shashank Hosahalli Shivamurthy 616e5feab8 fix typos 2025-10-06 18:41:38 -07:00
Shashank Hosahalli Shivamurthy 38f9b9f1a6 remove exception 2025-10-06 18:39:16 -07:00
Shashank Hosahalli Shivamurthy a51d19b800 patch to check record level futures 2025-10-05 18:31:43 -07:00
Shashank Hosahalli Shivamurthy 306117a919 test: added 3 passing tests 2025-10-05 18:31:43 -07:00
Shashank Hosahalli Shivamurthy 73b02c991e add helper functions and result for tests 2025-10-05 18:31:43 -07:00
6 changed files with 500 additions and 146 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@ -237,6 +238,20 @@ public final class ProducerBatch {
return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions);
}
/**
* Get all record futures for this batch.
* This is used by flush() to wait on individual records rather than the batch-level future.
* When batches are split, individual record futures are chained to the new batches,
* ensuring that flush() waits for all split batches to complete.
*
* @return List of FutureRecordMetadata for all records in this batch
*/
public List<FutureRecordMetadata> recordFutures() {
return thunks.stream()
.map(thunk -> thunk.future)
.collect(Collectors.toList());
}
/**
* Finalize the state of a batch. Final state, once set, is immutable. This function may be called
* once or twice on a batch. It may be called twice if

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.producer.Callback;
@ -1072,12 +1073,25 @@ public class RecordAccumulator {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
// Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush.
// We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original incomplete collection.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
// Obtain a snapshot of all record futures at the time of the flush.
// We wait on individual record futures rather than batch-level futures because
// by waiting on record futures, we ensure flush() blocks until all split
// batches complete.
//
// We first collect all futures into a list to avoid holding references to
// ProducerBatch objects, allowing them to be garbage collected after completion.
List<FutureRecordMetadata> futures = new ArrayList<>();
for (ProducerBatch batch : this.incomplete.copyAll()) {
futures.addAll(batch.recordFutures());
}
for (FutureRecordMetadata future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.trace("Completed future with exception during flush", e);
}
}
} finally {
this.flushesInProgress.decrementAndGet();
}

View File

@ -1066,6 +1066,41 @@ public class RecordAccumulatorTest {
assertEquals(1, future2.get().offset());
}
// here I am testing the hasRoomFor() behaviour
// It allows the first record no matter the size
// but does not allow the second record
@Test
public void testHasRoomForAllowsOversizedFirstRecordButRejectsSubsequentRecords() {
long now = time.milliseconds();
int smallBatchSize = 1024;
// Create a large record that exceeds batch size limit
byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
// Create a small buffer that cannot fit the large record
ByteBuffer buffer = ByteBuffer.allocate(smallBatchSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);
// testing existing code:
// hasRoomFor() should return true for first record regardless of size
boolean hasRoomForFirst = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertTrue(hasRoomForFirst, "hasRoomFor() should return true for first record regardless of size when numRecords == 0");
// append the first oversized record - should succeed
builder.append(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertEquals(1, builder.numRecords(), "Should have successfully appended the first oversized record");
// now append another large record when numRecords > 0
boolean hasRoomForSecond = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSecond, "hasRoomFor() should return false for oversized record when numRecords > 0");
// Now append with a smaller record that would normally fit but
// this too should be rejected due to limited buffer space
byte[] smallValue = new byte[100]; // Small record
boolean hasRoomForSmall = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(smallValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSmall, "hasRoomFor() should return false for any record when buffer is full from oversized first record");
}
@Test
public void testSplitBatchOffAccumulator() throws InterruptedException {
long seed = System.currentTimeMillis();
@ -1790,4 +1825,56 @@ public class RecordAccumulatorTest {
// Verify all original records are accounted for (no data loss)
assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting");
}
@Test
public void testFlushPerformanceWithManyRecords() throws Exception {
int numRecords = 5000;
int batchSize = 1024;
long totalSize = 10 * 1024 * 1024;
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
totalSize,
Compression.NONE,
Integer.MAX_VALUE);
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
RecordAccumulator.RecordAppendResult result = accum.append(
topic,
partition1,
0L,
key,
value,
Record.EMPTY_HEADERS,
null,
maxBlockTimeMs,
time.milliseconds(),
cluster);
if (result.future != null) {
futures.add(result.future);
}
}
accum.beginFlush();
// Need to complete all batches to mimic successful sends for awaitFlushCompletion()
List<ProducerBatch> batches = new ArrayList<>(accum.getDeque(tp1));
for (ProducerBatch batch : batches) {
batch.complete(0L, time.milliseconds());
}
// Measure time
long startNanos = System.nanoTime();
accum.awaitFlushCompletion();
long durationNanos = System.nanoTime() - startNanos;
double durationMs = durationNanos / 1_000_000.0;
System.out.printf("flush() with %d records took: %.3f ms%n", numRecords, durationMs);
for (ProducerBatch batch : batches) {
accum.deallocate(batch);
}
assertFalse(accum.flushInProgress());
}
}

View File

@ -15,13 +15,16 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.producerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class AtLeastOnceDeliveryMessageLossIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(
AtLeastOnceDeliveryMessageLossIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int LARGE_RECORD_COUNT = 50000;
private static final int SMALL_RECORD_COUNT = 40000;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
private String applicationId;
private String inputTopic;
private String outputTopic;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
@BeforeEach
public void setUp(final TestInfo testInfo) throws Exception {
final String testId = safeUniqueTestName(testInfo);
applicationId = "app-" + testId;
inputTopic = "input-" + testId;
outputTopic = "output-" + testId;
cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic);
CLUSTER.createTopics(inputTopic, outputTopic);
setupStreamsConfiguration();
}
@AfterEach
public void cleanUp() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
if (streamsConfiguration != null) {
purgeLocalStreamsState(streamsConfiguration);
}
}
// failing test
@Test
public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) {
produceInputData(LARGE_RECORD_COUNT);
kafkaStreams = createStreamsApplication();
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT));
waitForProcessingAndCommit();
// for this bug
// first offsets are committed, then
// no messages produced in output topic, then
// repeated retries and MESSAGE_TOO_LARGE error
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")),
"Should log MESSAGE_TOO_LARGE and splitting retry messages");
final int outputRecordCount = verifyOutputRecords(0); // should not produce records
final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced
assertEquals(0, outputRecordCount, "Output topic should not have any records");
assertTrue(offsetsCommitted, "Consumer offsets should not be committed");
}
}
@Test
public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception {
produceInputData(SMALL_RECORD_COUNT);
try (final KafkaStreams kafkaStreams = createStreamsApplication()) {
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT));
waitForProcessingAndCommit();
//normal behavior
final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records
final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets
assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records");
assertTrue(offsetsCommitted, "Consumer offsets should be committed");
}
}
private void setupStreamsConfiguration() {
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// AT_LEAST_ONCE processing guarantee
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L);
// Producer configuration that can trigger MESSAGE_TOO_LARGE errors
streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000);
streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432);
}
private void produceInputData(final int recordCount) {
final List<KeyValue<String, String>> inputRecords = new ArrayList<>();
for (int i = 1; i <= recordCount; i++) {
inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i));
}
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputRecords,
producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
CLUSTER.time
);
}
private void waitForProcessingAndCommit() throws Exception {
// Wait slightly longer than commit interval to ensure processing and offset commits
waitForCondition(
() -> {
try (final Admin adminClient = Admin.create(mkMap(
mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) {
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
return adminClient
.listConsumerGroupOffsets(applicationId)
.partitionsToOffsetAndMetadata()
.get()
.containsKey(topicPartition);
} catch (final Exception e) {
return false;
}
},
35000L,
"Waiting for consumer offsets to be committed"
);
}
private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception {
try (final Admin adminClient = Admin.create(mkMap(
mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) {
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
final long committedOffset = adminClient
.listConsumerGroupOffsets(applicationId)
.partitionsToOffsetAndMetadata()
.get()
.get(topicPartition)
.offset();
log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset);
return committedOffset == expectedOffset;
}
}
private int verifyOutputRecords(final int expectedRecordCount) {
try {
final List<KeyValue<String, String>> outputRecords =
waitUntilMinKeyValueRecordsReceived(
consumerConfig(
CLUSTER.bootstrapServers(),
applicationId + "-test-consumer-" + System.currentTimeMillis(),
StringDeserializer.class,
StringDeserializer.class
),
outputTopic,
expectedRecordCount,
30000L
);
log.info("Output topic {} contains {} records", outputTopic, outputRecords.size());
return outputRecords.size();
} catch (final Exception e) {
log.info("Exception while reading output records: {}", e.getMessage());
return 0;
}
}
private KafkaStreams createStreamsApplication() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(inputTopic);
input.peek((key, value) -> {
if (Integer.parseInt(key) % 1000 == 0) {
log.debug("Processing record {}: {} -> {}", key, key, value);
}
}).to(outputTopic);
return new KafkaStreams(builder.build(), streamsConfiguration);
}
}