KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-02-16 15:51:34 -08:00 committed by GitHub
parent f3dc3f0dad
commit aea6090ce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 43 deletions

View File

@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
recordPollReturned(toSend.size(), time.milliseconds() - start); recordPollReturned(toSend.size(), time.milliseconds() - start);
} }
} }
if (toSend == null) if (toSend == null) {
batchDispatched();
continue; continue;
}
log.trace("{} About to send {} records to Kafka", this, toSend.size()); log.trace("{} About to send {} records to Kafka", this, toSend.size());
if (sendRecords()) { if (sendRecords()) {
batchDispatched(); batchDispatched();

View File

@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
long started = time.milliseconds(); long started = time.milliseconds();
// We might have just aborted a transaction, in which case we'll have to begin a new one
// in order to commit offsets
maybeBeginTransaction();
AtomicReference<Throwable> flushError = new AtomicReference<>(); AtomicReference<Throwable> flushError = new AtomicReference<>();
boolean shouldFlush = false; boolean shouldFlush = false;
try { try {
@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
} catch (Throwable e) { } catch (Throwable e) {
flushError.compareAndSet(null, e); flushError.compareAndSet(null, e);
} }
if (flushError.get() == null && !transactionOpen && !shouldFlush) {
// There is no contents on the framework side to commit, so skip the offset flush and producer commit
long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
commitSourceTask();
return;
}
// We might have just aborted a transaction, in which case we'll have to begin a new one
// in order to commit offsets
maybeBeginTransaction();
if (shouldFlush) { if (shouldFlush) {
// Now we can actually write the offsets to the internal topic. // Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to complete by the time // No need to track the flush future here since it's guaranteed to complete by the time
@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
} }
private void maybeCommitTransaction(boolean shouldCommit) { private void maybeCommitTransaction(boolean shouldCommit) {
if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) { if (shouldCommit) {
try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) { try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) {
commitTransaction(); commitTransaction();
} }

View File

@ -150,13 +150,6 @@ public class OffsetStorageWriter {
} }
} }
/**
* @return whether there's anything to flush right now.
*/
public synchronized boolean willFlush() {
return !data.isEmpty();
}
/** /**
* Flush the current offsets and clear them from this writer. This is non-blocking: it * Flush the current offsets and clear them from this writer. This is non-blocking: it
* moves the current set of offsets out of the way, serializes the data, and asynchronously * moves the current set of offsets out of the way, serializes the data, and asynchronously

View File

@ -216,7 +216,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
verify(sourceTask, MockitoUtils.anyTimes()).commitRecord(any(), any()); verify(sourceTask, MockitoUtils.anyTimes()).commitRecord(any(), any());
verify(offsetWriter, MockitoUtils.anyTimes()).offset(PARTITION, OFFSET); verify(offsetWriter, MockitoUtils.anyTimes()).offset(PARTITION, OFFSET);
verify(offsetWriter, MockitoUtils.anyTimes()).willFlush();
verify(offsetWriter, MockitoUtils.anyTimes()).beginFlush(); verify(offsetWriter, MockitoUtils.anyTimes()).beginFlush();
verify(offsetWriter, MockitoUtils.anyTimes()).doFlush(any()); verify(offsetWriter, MockitoUtils.anyTimes()).doFlush(any());
@ -297,9 +296,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
return null; return null;
}).when(statusListener).onPause(eq(taskId)); }).when(statusListener).onPause(eq(taskId));
// The task checks to see if there are offsets to commit before pausing
when(offsetWriter.willFlush()).thenReturn(false);
startTaskThread(); startTaskThread();
assertTrue(pauseLatch.await(5, TimeUnit.SECONDS)); assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
@ -319,7 +315,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
expectSuccessfulSends(); expectSuccessfulSends();
expectSuccessfulFlushes(); expectSuccessfulFlushes();
when(offsetWriter.willFlush()).thenReturn(true);
final CountDownLatch pauseLatch = new CountDownLatch(1); final CountDownLatch pauseLatch = new CountDownLatch(1);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -342,7 +337,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
verify(statusListener).onPause(taskId); verify(statusListener).onPause(taskId);
// Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit // Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit
verifyTransactions(pollCount() + 2); verifyTransactions(pollCount() + 2, pollCount() + 2);
verifySends(); verifySends();
verifyPossibleTopicCreation(); verifyPossibleTopicCreation();
// make sure we didn't poll again after triggering shutdown // make sure we didn't poll again after triggering shutdown
@ -421,7 +416,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
expectSuccessfulSends(); expectSuccessfulSends();
expectSuccessfulFlushes(); expectSuccessfulFlushes();
when(offsetWriter.willFlush()).thenReturn(true);
startTaskThread(); startTaskThread();
@ -430,7 +424,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
awaitShutdown(); awaitShutdown();
// Task should have flushed offsets for every record poll and for end-of-life offset commit // Task should have flushed offsets for every record poll and for end-of-life offset commit
verifyTransactions(pollCount() + 1); verifyTransactions(pollCount() + 1, pollCount() + 1);
verifySends(); verifySends();
verifyPossibleTopicCreation(); verifyPossibleTopicCreation();
@ -498,7 +492,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
final CountDownLatch workerStopLatch = new CountDownLatch(1); final CountDownLatch workerStopLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException(); final RuntimeException exception = new RuntimeException();
// We check if there are offsets that need to be committed before shutting down the task // We check if there are offsets that need to be committed before shutting down the task
when(offsetWriter.willFlush()).thenReturn(false);
when(sourceTask.poll()).thenAnswer(invocation -> { when(sourceTask.poll()).thenAnswer(invocation -> {
pollLatch.countDown(); pollLatch.countDown();
ConcurrencyUtils.awaitLatch(workerStopLatch, "task was not stopped in time"); ConcurrencyUtils.awaitLatch(workerStopLatch, "task was not stopped in time");
@ -512,7 +505,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
workerStopLatch.countDown(); workerStopLatch.countDown();
awaitShutdown(false); awaitShutdown(false);
verifyTransactions(0); verifyTransactions(0, 0);
verifyPreflight(); verifyPreflight();
verifyStartup(); verifyStartup();
@ -525,7 +518,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
// Test that the task handles an empty list of records // Test that the task handles an empty list of records
createWorkerTask(); createWorkerTask();
when(offsetWriter.willFlush()).thenReturn(false); when(offsetWriter.beginFlush()).thenReturn(false);
startTaskThread(); startTaskThread();
@ -533,7 +526,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
awaitShutdown(); awaitShutdown();
verifyTransactions(0); // task commits for each empty poll, plus the final commit
verifyTransactions(pollCount() + 1, 0);
verifyPreflight(); verifyPreflight();
verifyStartup(); verifyStartup();
@ -550,7 +544,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
expectSuccessfulSends(); expectSuccessfulSends();
expectSuccessfulFlushes(); expectSuccessfulFlushes();
when(offsetWriter.willFlush()).thenReturn(true);
startTaskThread(); startTaskThread();
@ -559,7 +552,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
awaitShutdown(); awaitShutdown();
// Task should have flushed offsets for every record poll, and for end-of-life offset commit // Task should have flushed offsets for every record poll, and for end-of-life offset commit
verifyTransactions(pollCount() + 1); verifyTransactions(pollCount() + 1, pollCount() + 1);
verifySends(); verifySends();
verifyPossibleTopicCreation(); verifyPossibleTopicCreation();
@ -583,7 +576,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
expectSuccessfulSends(); expectSuccessfulSends();
expectSuccessfulFlushes(); expectSuccessfulFlushes();
when(offsetWriter.willFlush()).thenReturn(true);
startTaskThread(); startTaskThread();
@ -601,7 +593,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
awaitShutdown(); awaitShutdown();
// Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit // Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit
verifyTransactions(3); verifyTransactions(3, 3);
verifySends(); verifySends();
verifyPossibleTopicCreation(); verifyPossibleTopicCreation();
@ -639,7 +631,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
expectSuccessfulSends(); expectSuccessfulSends();
expectSuccessfulFlushes(); expectSuccessfulFlushes();
when(offsetWriter.willFlush()).thenReturn(true);
TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext(); TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
@ -681,7 +672,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
@Test @Test
public void testCommitFlushSyncCallbackFailure() throws Exception { public void testCommitFlushSyncCallbackFailure() throws Exception {
Exception failure = new RecordTooLargeException(); Exception failure = new RecordTooLargeException();
when(offsetWriter.willFlush()).thenReturn(true);
when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
Callback<Void> callback = invocation.getArgument(0); Callback<Void> callback = invocation.getArgument(0);
@ -694,7 +684,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
@Test @Test
public void testCommitFlushAsyncCallbackFailure() throws Exception { public void testCommitFlushAsyncCallbackFailure() throws Exception {
Exception failure = new RecordTooLargeException(); Exception failure = new RecordTooLargeException();
when(offsetWriter.willFlush()).thenReturn(true);
when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true);
// doFlush delegates its callback to the producer, // doFlush delegates its callback to the producer,
// which delays completing the callback until commitTransaction // which delays completing the callback until commitTransaction
@ -713,7 +702,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
@Test @Test
public void testCommitTransactionFailure() throws Exception { public void testCommitTransactionFailure() throws Exception {
Exception failure = new RecordTooLargeException(); Exception failure = new RecordTooLargeException();
when(offsetWriter.willFlush()).thenReturn(true);
when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.beginFlush()).thenReturn(true);
doThrow(failure).when(producer).commitTransaction(); doThrow(failure).when(producer).commitTransaction();
testCommitFailure(failure, true); testCommitFailure(failure, true);
@ -829,8 +817,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
return null; return null;
}).when(sourceTask).start(eq(TASK_PROPS)); }).when(sourceTask).start(eq(TASK_PROPS));
when(offsetWriter.willFlush()).thenReturn(false); when(offsetWriter.beginFlush()).thenReturn(false);
startTaskThread(); startTaskThread();
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits, // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
@ -842,7 +829,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
awaitShutdown(false); awaitShutdown(false);
verify(sourceTask, never()).poll(); verify(sourceTask, never()).poll();
verifyTransactions(0); // task commit called on shutdown
verifyTransactions(1, 0);
verifySends(0); verifySends(0);
verifyPreflight(); verifyPreflight();
@ -1037,13 +1025,16 @@ public class ExactlyOnceWorkerSourceTaskTest {
verify(producer, times(count)).send(any(), any()); verify(producer, times(count)).send(any(), any());
} }
private void verifyTransactions(int numBatches) throws InterruptedException, TimeoutException { private void verifyTransactions(int numTaskCommits, int numProducerCommits) throws InterruptedException {
VerificationMode mode = times(numBatches); // these operations happen on every commit opportunity
verify(producer, mode).beginTransaction(); VerificationMode commitOpportunities = times(numTaskCommits);
verify(producer, mode).commitTransaction(); verify(offsetWriter, commitOpportunities).beginFlush();
verify(offsetWriter, mode).beginFlush(); verify(sourceTask, commitOpportunities).commit();
verify(offsetWriter, mode).doFlush(any()); // these operations only happen on non-empty commits
verify(sourceTask, mode).commit(); VerificationMode commits = times(numProducerCommits);
verify(producer, commits).beginTransaction();
verify(producer, commits).commitTransaction();
verify(offsetWriter, commits).doFlush(any());
} }
private void assertTransactionMetrics(int minimumMaxSizeExpected) { private void assertTransactionMetrics(int minimumMaxSizeExpected) {