diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 5619819dde7..121c6154de1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -237,6 +238,20 @@ public final class ProducerBatch { return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, topLevelException, recordExceptions); } + /** + * Get all record futures for this batch. + * This is used by flush() to wait on individual records rather than the batch-level future. + * When batches are split, individual record futures are chained to the new batches, + * ensuring that flush() waits for all split batches to complete. + * + * @return List of FutureRecordMetadata for all records in this batch + */ + public List recordFutures() { + return thunks.stream() + .map(thunk -> thunk.future) + .collect(Collectors.toList()); + } + /** * Finalize the state of a batch. Final state, once set, is immutable. This function may be called * once or twice on a batch. It may be called twice if diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index f0c2719db96..25801b0d75d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.producer.Callback; @@ -1072,12 +1073,25 @@ public class RecordAccumulator { */ public void awaitFlushCompletion() throws InterruptedException { try { - // Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush. - // We must be careful not to hold a reference to the ProduceBatch(s) so that garbage - // collection can occur on the contents. - // The sender will remove ProducerBatch(s) from the original incomplete collection. - for (ProduceRequestResult result : this.incomplete.requestResults()) - result.await(); + // Obtain a snapshot of all record futures at the time of the flush. + // We wait on individual record futures rather than batch-level futures because + // by waiting on record futures, we ensure flush() blocks until all split + // batches complete. + // + // We first collect all futures into a list to avoid holding references to + // ProducerBatch objects, allowing them to be garbage collected after completion. + List futures = new ArrayList<>(); + for (ProducerBatch batch : this.incomplete.copyAll()) { + futures.addAll(batch.recordFutures()); + } + + for (FutureRecordMetadata future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + log.trace("Completed future with exception during flush", e); + } + } } finally { this.flushesInProgress.decrementAndGet(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 750440d2595..e9a0cef29d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1066,6 +1066,41 @@ public class RecordAccumulatorTest { assertEquals(1, future2.get().offset()); } + // here I am testing the hasRoomFor() behaviour + // It allows the first record no matter the size + // but does not allow the second record + @Test + public void testHasRoomForAllowsOversizedFirstRecordButRejectsSubsequentRecords() { + long now = time.milliseconds(); + int smallBatchSize = 1024; + + // Create a large record that exceeds batch size limit + byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB + + // Create a small buffer that cannot fit the large record + ByteBuffer buffer = ByteBuffer.allocate(smallBatchSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + + // testing existing code: + // hasRoomFor() should return true for first record regardless of size + boolean hasRoomForFirst = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertTrue(hasRoomForFirst, "hasRoomFor() should return true for first record regardless of size when numRecords == 0"); + + // append the first oversized record - should succeed + builder.append(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertEquals(1, builder.numRecords(), "Should have successfully appended the first oversized record"); + + // now append another large record when numRecords > 0 + boolean hasRoomForSecond = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS); + assertFalse(hasRoomForSecond, "hasRoomFor() should return false for oversized record when numRecords > 0"); + + // Now append with a smaller record that would normally fit but + // this too should be rejected due to limited buffer space + byte[] smallValue = new byte[100]; // Small record + boolean hasRoomForSmall = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(smallValue), Record.EMPTY_HEADERS); + assertFalse(hasRoomForSmall, "hasRoomFor() should return false for any record when buffer is full from oversized first record"); + } + @Test public void testSplitBatchOffAccumulator() throws InterruptedException { long seed = System.currentTimeMillis(); @@ -1790,4 +1825,56 @@ public class RecordAccumulatorTest { // Verify all original records are accounted for (no data loss) assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting"); } + + @Test + public void testFlushPerformanceWithManyRecords() throws Exception { + int numRecords = 5000; + int batchSize = 1024; + long totalSize = 10 * 1024 * 1024; + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, + totalSize, + Compression.NONE, + Integer.MAX_VALUE); + + List> futures = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + RecordAccumulator.RecordAppendResult result = accum.append( + topic, + partition1, + 0L, + key, + value, + Record.EMPTY_HEADERS, + null, + maxBlockTimeMs, + time.milliseconds(), + cluster); + if (result.future != null) { + futures.add(result.future); + } + } + + accum.beginFlush(); + + // Need to complete all batches to mimic successful sends for awaitFlushCompletion() + List batches = new ArrayList<>(accum.getDeque(tp1)); + for (ProducerBatch batch : batches) { + batch.complete(0L, time.milliseconds()); + } + + // Measure time + long startNanos = System.nanoTime(); + accum.awaitFlushCompletion(); + long durationNanos = System.nanoTime() - startNanos; + + double durationMs = durationNanos / 1_000_000.0; + System.out.printf("flush() with %d records took: %.3f ms%n", numRecords, durationMs); + + for (ProducerBatch batch : batches) { + accum.deallocate(batch); + } + + assertFalse(accum.flushInProgress()); + } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java new file mode 100644 index 00000000000..ff5e0634d81 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.consumerConfig; +import static org.apache.kafka.test.TestUtils.producerConfig; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(600) +@Tag("integration") +public class AtLeastOnceDeliveryMessageLossIntegrationTest { + private static final Logger log = LoggerFactory.getLogger( + AtLeastOnceDeliveryMessageLossIntegrationTest.class); + + private static final int NUM_BROKERS = 1; + private static final int LARGE_RECORD_COUNT = 50000; + private static final int SMALL_RECORD_COUNT = 40000; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @BeforeAll + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + + private String applicationId; + private String inputTopic; + private String outputTopic; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + + @BeforeEach + public void setUp(final TestInfo testInfo) throws Exception { + final String testId = safeUniqueTestName(testInfo); + applicationId = "app-" + testId; + inputTopic = "input-" + testId; + outputTopic = "output-" + testId; + + cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic); + CLUSTER.createTopics(inputTopic, outputTopic); + + setupStreamsConfiguration(); + } + + @AfterEach + public void cleanUp() throws Exception { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + if (streamsConfiguration != null) { + purgeLocalStreamsState(streamsConfiguration); + } + } + + // failing test + @Test + public void shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge() throws Exception { + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(Sender.class)) { + produceInputData(LARGE_RECORD_COUNT); + + kafkaStreams = createStreamsApplication(); + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + // for this bug + // first offsets are committed, then + // no messages produced in output topic, then + // repeated retries and MESSAGE_TOO_LARGE error + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") && msg.contains("splitting and retrying")), + "Should log MESSAGE_TOO_LARGE and splitting retry messages"); + + final int outputRecordCount = verifyOutputRecords(0); // should not produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(0); // should not commit offset unless records are produced + + assertEquals(0, outputRecordCount, "Output topic should not have any records"); + assertTrue(offsetsCommitted, "Consumer offsets should not be committed"); + } + } + + @Test + public void shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws Exception { + produceInputData(SMALL_RECORD_COUNT); + + try (final KafkaStreams kafkaStreams = createStreamsApplication()) { + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofMillis(DEFAULT_TIMEOUT)); + + waitForProcessingAndCommit(); + + //normal behavior + final int outputRecordCount = verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records + final boolean offsetsCommitted = verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets + + assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic should have " + SMALL_RECORD_COUNT + " records"); + assertTrue(offsetsCommitted, "Consumer offsets should be committed"); + } + } + + + private void setupStreamsConfiguration() { + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // AT_LEAST_ONCE processing guarantee + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000L); + + // Producer configuration that can trigger MESSAGE_TOO_LARGE errors + streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000); + streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432); + } + + private void produceInputData(final int recordCount) { + final List> inputRecords = new ArrayList<>(); + for (int i = 1; i <= recordCount; i++) { + inputRecords.add(new KeyValue<>(String.valueOf(i), "item-" + i)); + } + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopic, + inputRecords, + producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), + CLUSTER.time + ); + } + + private void waitForProcessingAndCommit() throws Exception { + // Wait slightly longer than commit interval to ensure processing and offset commits + waitForCondition( + () -> { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + return adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .containsKey(topicPartition); + } catch (final Exception e) { + return false; + } + }, + 35000L, + "Waiting for consumer offsets to be committed" + ); + } + + private boolean verifyConsumerOffsetsCommitted(final int expectedOffset) throws Exception { + try (final Admin adminClient = Admin.create(mkMap( + mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())))) { + + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + + final long committedOffset = adminClient + .listConsumerGroupOffsets(applicationId) + .partitionsToOffsetAndMetadata() + .get() + .get(topicPartition) + .offset(); + + log.info("Consumer group {} committed offset: {} (expected: {})", applicationId, committedOffset, expectedOffset); + return committedOffset == expectedOffset; + } + } + + private int verifyOutputRecords(final int expectedRecordCount) { + try { + final List> outputRecords = + waitUntilMinKeyValueRecordsReceived( + consumerConfig( + CLUSTER.bootstrapServers(), + applicationId + "-test-consumer-" + System.currentTimeMillis(), + StringDeserializer.class, + StringDeserializer.class + ), + outputTopic, + expectedRecordCount, + 30000L + ); + log.info("Output topic {} contains {} records", outputTopic, outputRecords.size()); + return outputRecords.size(); + } catch (final Exception e) { + log.info("Exception while reading output records: {}", e.getMessage()); + return 0; + } + } + + private KafkaStreams createStreamsApplication() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream input = builder.stream(inputTopic); + input.peek((key, value) -> { + if (Integer.parseInt(key) % 1000 == 0) { + log.debug("Processing record {}: {} -> {}", key, key, value); + } + }).to(outputTopic); + + return new KafkaStreams(builder.build(), streamsConfiguration); + } +} \ No newline at end of file