From aea6090ce479a06ea5489a54aeecf9b40233a3a1 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Thu, 16 Feb 2023 15:51:34 -0800 Subject: [PATCH] KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262) Reviewers: Chris Egerton --- .../runtime/AbstractWorkerSourceTask.java | 4 +- .../runtime/ExactlyOnceWorkerSourceTask.java | 20 ++++++-- .../connect/storage/OffsetStorageWriter.java | 7 --- .../ExactlyOnceWorkerSourceTaskTest.java | 51 ++++++++----------- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index ff15f631a73..fb3c04be6cf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { recordPollReturned(toSend.size(), time.milliseconds() - start); } } - if (toSend == null) + if (toSend == null) { + batchDispatched(); continue; + } log.trace("{} About to send {} records to Kafka", this, toSend.size()); if (sendRecords()) { batchDispatched(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index d4ef5ba8106..8b4a8d3c9cc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { long started = time.milliseconds(); - // We might have just aborted a transaction, in which case we'll have to begin a new one - // in order to commit offsets - maybeBeginTransaction(); - AtomicReference flushError = new AtomicReference<>(); boolean shouldFlush = false; try { @@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } catch (Throwable e) { flushError.compareAndSet(null, e); } + if (flushError.get() == null && !transactionOpen && !shouldFlush) { + // There is no contents on the framework side to commit, so skip the offset flush and producer commit + long durationMillis = time.milliseconds() - started; + recordCommitSuccess(durationMillis); + log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); + + commitSourceTask(); + return; + } + + // We might have just aborted a transaction, in which case we'll have to begin a new one + // in order to commit offsets + maybeBeginTransaction(); + if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time @@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } private void maybeCommitTransaction(boolean shouldCommit) { - if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) { + if (shouldCommit) { try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) { commitTransaction(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index d3141d4758e..89e7824a65b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -150,13 +150,6 @@ public class OffsetStorageWriter { } } - /** - * @return whether there's anything to flush right now. - */ - public synchronized boolean willFlush() { - return !data.isEmpty(); - } - /** * Flush the current offsets and clear them from this writer. This is non-blocking: it * moves the current set of offsets out of the way, serializes the data, and asynchronously diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 1257834252e..9011d5c0a88 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -216,7 +216,6 @@ public class ExactlyOnceWorkerSourceTaskTest { verify(sourceTask, MockitoUtils.anyTimes()).commitRecord(any(), any()); verify(offsetWriter, MockitoUtils.anyTimes()).offset(PARTITION, OFFSET); - verify(offsetWriter, MockitoUtils.anyTimes()).willFlush(); verify(offsetWriter, MockitoUtils.anyTimes()).beginFlush(); verify(offsetWriter, MockitoUtils.anyTimes()).doFlush(any()); @@ -297,9 +296,6 @@ public class ExactlyOnceWorkerSourceTaskTest { return null; }).when(statusListener).onPause(eq(taskId)); - // The task checks to see if there are offsets to commit before pausing - when(offsetWriter.willFlush()).thenReturn(false); - startTaskThread(); assertTrue(pauseLatch.await(5, TimeUnit.SECONDS)); @@ -319,7 +315,6 @@ public class ExactlyOnceWorkerSourceTaskTest { expectSuccessfulSends(); expectSuccessfulFlushes(); - when(offsetWriter.willFlush()).thenReturn(true); final CountDownLatch pauseLatch = new CountDownLatch(1); doAnswer(invocation -> { @@ -342,7 +337,7 @@ public class ExactlyOnceWorkerSourceTaskTest { verify(statusListener).onPause(taskId); // Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit - verifyTransactions(pollCount() + 2); + verifyTransactions(pollCount() + 2, pollCount() + 2); verifySends(); verifyPossibleTopicCreation(); // make sure we didn't poll again after triggering shutdown @@ -421,7 +416,6 @@ public class ExactlyOnceWorkerSourceTaskTest { expectSuccessfulSends(); expectSuccessfulFlushes(); - when(offsetWriter.willFlush()).thenReturn(true); startTaskThread(); @@ -430,7 +424,7 @@ public class ExactlyOnceWorkerSourceTaskTest { awaitShutdown(); // Task should have flushed offsets for every record poll and for end-of-life offset commit - verifyTransactions(pollCount() + 1); + verifyTransactions(pollCount() + 1, pollCount() + 1); verifySends(); verifyPossibleTopicCreation(); @@ -498,7 +492,6 @@ public class ExactlyOnceWorkerSourceTaskTest { final CountDownLatch workerStopLatch = new CountDownLatch(1); final RuntimeException exception = new RuntimeException(); // We check if there are offsets that need to be committed before shutting down the task - when(offsetWriter.willFlush()).thenReturn(false); when(sourceTask.poll()).thenAnswer(invocation -> { pollLatch.countDown(); ConcurrencyUtils.awaitLatch(workerStopLatch, "task was not stopped in time"); @@ -512,7 +505,7 @@ public class ExactlyOnceWorkerSourceTaskTest { workerStopLatch.countDown(); awaitShutdown(false); - verifyTransactions(0); + verifyTransactions(0, 0); verifyPreflight(); verifyStartup(); @@ -525,7 +518,7 @@ public class ExactlyOnceWorkerSourceTaskTest { // Test that the task handles an empty list of records createWorkerTask(); - when(offsetWriter.willFlush()).thenReturn(false); + when(offsetWriter.beginFlush()).thenReturn(false); startTaskThread(); @@ -533,7 +526,8 @@ public class ExactlyOnceWorkerSourceTaskTest { awaitShutdown(); - verifyTransactions(0); + // task commits for each empty poll, plus the final commit + verifyTransactions(pollCount() + 1, 0); verifyPreflight(); verifyStartup(); @@ -550,7 +544,6 @@ public class ExactlyOnceWorkerSourceTaskTest { expectSuccessfulSends(); expectSuccessfulFlushes(); - when(offsetWriter.willFlush()).thenReturn(true); startTaskThread(); @@ -559,7 +552,7 @@ public class ExactlyOnceWorkerSourceTaskTest { awaitShutdown(); // Task should have flushed offsets for every record poll, and for end-of-life offset commit - verifyTransactions(pollCount() + 1); + verifyTransactions(pollCount() + 1, pollCount() + 1); verifySends(); verifyPossibleTopicCreation(); @@ -583,7 +576,6 @@ public class ExactlyOnceWorkerSourceTaskTest { expectSuccessfulSends(); expectSuccessfulFlushes(); - when(offsetWriter.willFlush()).thenReturn(true); startTaskThread(); @@ -601,7 +593,7 @@ public class ExactlyOnceWorkerSourceTaskTest { awaitShutdown(); // Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit - verifyTransactions(3); + verifyTransactions(3, 3); verifySends(); verifyPossibleTopicCreation(); @@ -639,7 +631,6 @@ public class ExactlyOnceWorkerSourceTaskTest { expectSuccessfulSends(); expectSuccessfulFlushes(); - when(offsetWriter.willFlush()).thenReturn(true); TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext(); @@ -681,7 +672,6 @@ public class ExactlyOnceWorkerSourceTaskTest { @Test public void testCommitFlushSyncCallbackFailure() throws Exception { Exception failure = new RecordTooLargeException(); - when(offsetWriter.willFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { Callback callback = invocation.getArgument(0); @@ -694,7 +684,6 @@ public class ExactlyOnceWorkerSourceTaskTest { @Test public void testCommitFlushAsyncCallbackFailure() throws Exception { Exception failure = new RecordTooLargeException(); - when(offsetWriter.willFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true); // doFlush delegates its callback to the producer, // which delays completing the callback until commitTransaction @@ -713,7 +702,6 @@ public class ExactlyOnceWorkerSourceTaskTest { @Test public void testCommitTransactionFailure() throws Exception { Exception failure = new RecordTooLargeException(); - when(offsetWriter.willFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true); doThrow(failure).when(producer).commitTransaction(); testCommitFailure(failure, true); @@ -829,8 +817,7 @@ public class ExactlyOnceWorkerSourceTaskTest { return null; }).when(sourceTask).start(eq(TASK_PROPS)); - when(offsetWriter.willFlush()).thenReturn(false); - + when(offsetWriter.beginFlush()).thenReturn(false); startTaskThread(); // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, @@ -842,7 +829,8 @@ public class ExactlyOnceWorkerSourceTaskTest { awaitShutdown(false); verify(sourceTask, never()).poll(); - verifyTransactions(0); + // task commit called on shutdown + verifyTransactions(1, 0); verifySends(0); verifyPreflight(); @@ -1037,13 +1025,16 @@ public class ExactlyOnceWorkerSourceTaskTest { verify(producer, times(count)).send(any(), any()); } - private void verifyTransactions(int numBatches) throws InterruptedException, TimeoutException { - VerificationMode mode = times(numBatches); - verify(producer, mode).beginTransaction(); - verify(producer, mode).commitTransaction(); - verify(offsetWriter, mode).beginFlush(); - verify(offsetWriter, mode).doFlush(any()); - verify(sourceTask, mode).commit(); + private void verifyTransactions(int numTaskCommits, int numProducerCommits) throws InterruptedException { + // these operations happen on every commit opportunity + VerificationMode commitOpportunities = times(numTaskCommits); + verify(offsetWriter, commitOpportunities).beginFlush(); + verify(sourceTask, commitOpportunities).commit(); + // these operations only happen on non-empty commits + VerificationMode commits = times(numProducerCommits); + verify(producer, commits).beginTransaction(); + verify(producer, commits).commitTransaction(); + verify(offsetWriter, commits).doFlush(any()); } private void assertTransactionMetrics(int minimumMaxSizeExpected) {