This commit is contained in:
Shashank 2025-10-07 17:24:13 +00:00 committed by GitHub
commit cca1bc49a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 395 additions and 6 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@ -237,6 +238,20 @@ public final class ProducerBatch {
return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions);
}
/**
* Get all record futures for this batch.
* This is used by flush() to wait on individual records rather than the batch-level future.
* When batches are split, individual record futures are chained to the new batches,
* ensuring that flush() waits for all split batches to complete.
*
* @return List of FutureRecordMetadata for all records in this batch
*/
public List<FutureRecordMetadata> recordFutures() {
return thunks.stream()
.map(thunk -> thunk.future)
.collect(Collectors.toList());
}
/**
* Finalize the state of a batch. Final state, once set, is immutable. This function may be called
* once or twice on a batch. It may be called twice if

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.producer.Callback;
@ -1072,12 +1073,25 @@ public class RecordAccumulator {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
// Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush.
// We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original incomplete collection.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
// Obtain a snapshot of all record futures at the time of the flush.
// We wait on individual record futures rather than batch-level futures because
// by waiting on record futures, we ensure flush() blocks until all split
// batches complete.
//
// We first collect all futures into a list to avoid holding references to
// ProducerBatch objects, allowing them to be garbage collected after completion.
List<FutureRecordMetadata> futures = new ArrayList<>();
for (ProducerBatch batch : this.incomplete.copyAll()) {
futures.addAll(batch.recordFutures());
}
for (FutureRecordMetadata future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.trace("Completed future with exception during flush", e);
}
}
} finally {
this.flushesInProgress.decrementAndGet();
}

View File

@ -1066,6 +1066,41 @@ public class RecordAccumulatorTest {
assertEquals(1, future2.get().offset());
}
// here I am testing the hasRoomFor() behaviour
// It allows the first record no matter the size
// but does not allow the second record
@Test
public void testHasRoomForAllowsOversizedFirstRecordButRejectsSubsequentRecords() {
long now = time.milliseconds();
int smallBatchSize = 1024;
// Create a large record that exceeds batch size limit
byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
// Create a small buffer that cannot fit the large record
ByteBuffer buffer = ByteBuffer.allocate(smallBatchSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);
// testing existing code:
// hasRoomFor() should return true for first record regardless of size
boolean hasRoomForFirst = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertTrue(hasRoomForFirst, "hasRoomFor() should return true for first record regardless of size when numRecords == 0");
// append the first oversized record - should succeed
builder.append(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertEquals(1, builder.numRecords(), "Should have successfully appended the first oversized record");
// now append another large record when numRecords > 0
boolean hasRoomForSecond = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSecond, "hasRoomFor() should return false for oversized record when numRecords > 0");
// Now append with a smaller record that would normally fit but
// this too should be rejected due to limited buffer space
byte[] smallValue = new byte[100]; // Small record
boolean hasRoomForSmall = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(smallValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSmall, "hasRoomFor() should return false for any record when buffer is full from oversized first record");
}
@Test
public void testSplitBatchOffAccumulator() throws InterruptedException {
long seed = System.currentTimeMillis();
@ -1790,4 +1825,56 @@ public class RecordAccumulatorTest {
// Verify all original records are accounted for (no data loss)
assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting");
}
@Test
public void testFlushPerformanceWithManyRecords() throws Exception {
int numRecords = 5000;
int batchSize = 1024;
long totalSize = 10 * 1024 * 1024;
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
totalSize,
Compression.NONE,
Integer.MAX_VALUE);
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
RecordAccumulator.RecordAppendResult result = accum.append(
topic,
partition1,
0L,
key,
value,
Record.EMPTY_HEADERS,
null,
maxBlockTimeMs,
time.milliseconds(),
cluster);
if (result.future != null) {
futures.add(result.future);
}
}
accum.beginFlush();
// Need to complete all batches to mimic successful sends for awaitFlushCompletion()
List<ProducerBatch> batches = new ArrayList<>(accum.getDeque(tp1));
for (ProducerBatch batch : batches) {
batch.complete(0L, time.milliseconds());
}
// Measure time
long startNanos = System.nanoTime();
accum.awaitFlushCompletion();
long durationNanos = System.nanoTime() - startNanos;
double durationMs = durationNanos / 1_000_000.0;
System.out.printf("flush() with %d records took: %.3f ms%n", numRecords, durationMs);
for (ProducerBatch batch : batches) {
accum.deallocate(batch);
}
assertFalse(accum.flushInProgress());
}
}

View File

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.producerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class AtLeastOnceDeliveryMessageLossIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(
AtLeastOnceDeliveryMessageLossIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int LARGE_RECORD_COUNT = 50000;
private static final int SMALL_RECORD_COUNT = 40000;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
private String applicationId;
private String inputTopic;
private String outputTopic;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
@BeforeEach
public void setUp(final TestInfo testInfo) throws Exception {
final String testId = safeUniqueTestName(testInfo);
applicationId = "app-" + testId;
inputTopic = "input-" + testId;
outputTopic = "output-" + testId;
cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic);
CLUSTER.createTopics(inputTopic, outputTopic);
setupStreamsConfiguration();
}
@AfterEach
public void cleanUp() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
if (streamsConfiguration != null) {
purgeLocalStreamsState(streamsConfiguration);
}
}
// failing test
@Test
public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) {
produceInputData(LARGE_RECORD_COUNT);
kafkaStreams = createStreamsApplication();
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT));
waitForProcessingAndCommit();
// for this bug
// first offsets are committed, then
// no messages produced in output topic, then
// repeated retries and MESSAGE_TOO_LARGE error
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")),
"Should log MESSAGE_TOO_LARGE and splitting retry messages");
final int outputRecordCount = verifyOutputRecords(0); // should not produce records
final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced
assertEquals(0, outputRecordCount, "Output topic should not have any records");
assertTrue(offsetsCommitted, "Consumer offsets should not be committed");
}
}
@Test
public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception {
produceInputData(SMALL_RECORD_COUNT);
try (final KafkaStreams kafkaStreams = createStreamsApplication()) {
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT));
waitForProcessingAndCommit();
//normal behavior
final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records
final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets
assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records");
assertTrue(offsetsCommitted, "Consumer offsets should be committed");
}
}
private void setupStreamsConfiguration() {
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// AT_LEAST_ONCE processing guarantee
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L);
// Producer configuration that can trigger MESSAGE_TOO_LARGE errors
streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000);
streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432);
}
private void produceInputData(final int recordCount) {
final List<KeyValue<String, String>> inputRecords = new ArrayList<>();
for (int i = 1; i <= recordCount; i++) {
inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i));
}
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputRecords,
producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
CLUSTER.time
);
}
private void waitForProcessingAndCommit() throws Exception {
// Wait slightly longer than commit interval to ensure processing and offset commits
waitForCondition(
() -> {
try (final Admin adminClient = Admin.create(mkMap(
mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) {
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
return adminClient
.listConsumerGroupOffsets(applicationId)
.partitionsToOffsetAndMetadata()
.get()
.containsKey(topicPartition);
} catch (final Exception e) {
return false;
}
},
35000L,
"Waiting for consumer offsets to be committed"
);
}
private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception {
try (final Admin adminClient = Admin.create(mkMap(
mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) {
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
final long committedOffset = adminClient
.listConsumerGroupOffsets(applicationId)
.partitionsToOffsetAndMetadata()
.get()
.get(topicPartition)
.offset();
log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset);
return committedOffset == expectedOffset;
}
}
private int verifyOutputRecords(final int expectedRecordCount) {
try {
final List<KeyValue<String, String>> outputRecords =
waitUntilMinKeyValueRecordsReceived(
consumerConfig(
CLUSTER.bootstrapServers(),
applicationId + "-test-consumer-" + System.currentTimeMillis(),
StringDeserializer.class,
StringDeserializer.class
),
outputTopic,
expectedRecordCount,
30000L
);
log.info("Output topic {} contains {} records", outputTopic, outputRecords.size());
return outputRecords.size();
} catch (final Exception e) {
log.info("Exception while reading output records: {}", e.getMessage());
return 0;
}
}
private KafkaStreams createStreamsApplication() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(inputTopic);
input.peek((key, value) -> {
if (Integer.parseInt(key) % 1000 == 0) {
log.debug("Processing record {}: {} -> {}", key, key, value);
}
}).to(outputTopic);
return new KafkaStreams(builder.build(), streamsConfiguration);
}
}