KAFKA-5756: Wait for concurrent source task offset flush to complete before starting next flush (#13208)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-02-15 18:29:20 -08:00 committed by Chris Egerton
parent b8e01e2406
commit bc50bb7c6a
9 changed files with 149 additions and 34 deletions

View File

@ -155,7 +155,7 @@
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|TopicAdmin).java"/> files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
<!-- connect tests--> <!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling" <suppress checks="ClassDataAbstractionCoupling"

View File

@ -361,6 +361,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Ignore and allow to exit. // Ignore and allow to exit.
} catch (RuntimeException e) { } catch (RuntimeException e) {
if (isCancelled()) {
log.debug("Skipping final offset commit as task has been cancelled");
throw e;
}
try { try {
finalOffsetCommit(true); finalOffsetCommit(true);
} catch (Exception offsetException) { } catch (Exception offsetException) {

View File

@ -217,9 +217,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
if (failed) { if (failed) {
log.debug("Skipping final offset commit as task has failed"); log.debug("Skipping final offset commit as task has failed");
return; return;
} else if (isCancelled()) {
log.debug("Skipping final offset commit as task has been cancelled");
return;
} }
// It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the // It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
@ -261,7 +258,16 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
maybeBeginTransaction(); maybeBeginTransaction();
AtomicReference<Throwable> flushError = new AtomicReference<>(); AtomicReference<Throwable> flushError = new AtomicReference<>();
if (offsetWriter.beginFlush()) { boolean shouldFlush = false;
try {
// Begin the flush without waiting, as there should not be any concurrent flushes.
// This is because commitTransaction is always called on the same thread, and should always block until
// the flush is complete, or cancel the flush if an error occurs.
shouldFlush = offsetWriter.beginFlush();
} catch (Throwable e) {
flushError.compareAndSet(null, e);
}
if (shouldFlush) {
// Now we can actually write the offsets to the internal topic. // Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to complete by the time // No need to track the flush future here since it's guaranteed to complete by the time
// Producer::commitTransaction completes // Producer::commitTransaction completes

View File

@ -363,6 +363,10 @@ class WorkerSinkTask extends WorkerTask {
* the write commit. * the write commit.
**/ **/
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, int seqno) { private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, int seqno) {
if (isCancelled()) {
log.debug("Skipping final offset commit as task has been cancelled");
return;
}
if (closing) { if (closing) {
doCommitSync(offsets, seqno); doCommitSync(offsets, seqno);
} else { } else {

View File

@ -239,7 +239,19 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
// though we may update them here with newer offsets for acked records. // though we may update them here with newer offsets for acked records.
offsetsToCommit.offsets().forEach(offsetWriter::offset); offsetsToCommit.offsets().forEach(offsetWriter::offset);
if (!offsetWriter.beginFlush()) { boolean shouldFlush;
try {
shouldFlush = offsetWriter.beginFlush(timeout - time.milliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this);
recordCommitFailure(time.milliseconds() - started, e);
return false;
} catch (TimeoutException e) {
log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this);
recordCommitFailure(time.milliseconds() - started, e);
return false;
}
if (!shouldFlush) {
// There was nothing in the offsets to process, but we still mark a successful offset commit. // There was nothing in the offsets to process, but we still mark a successful offset commit.
long durationMillis = time.milliseconds() - started; long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis); recordCommitSuccess(durationMillis);

View File

@ -26,6 +26,9 @@ import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* <p> * <p>
@ -73,6 +76,7 @@ public class OffsetStorageWriter {
private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>(); private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
private Map<Map<String, Object>, Map<String, Object>> toFlush = null; private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
private final Semaphore flushInProgress = new Semaphore(1);
// Unique ID for each flush request to handle callbacks after timeouts // Unique ID for each flush request to handle callbacks after timeouts
private long currentFlushId = 0; private long currentFlushId = 0;
@ -100,23 +104,50 @@ public class OffsetStorageWriter {
/** /**
* Performs the first step of a flush operation, snapshotting the current state. This does not * Performs the first step of a flush operation, snapshotting the current state. This does not
* actually initiate the flush with the underlying storage. * actually initiate the flush with the underlying storage. Ensures that any previous flush operations
* have finished before beginning a new flush.
* *
* @return true if a flush was initiated, false if no data was available * @return true if a flush was initiated, false if no data was available
* @throws ConnectException if the previous flush is not complete before this method is called
*/ */
public synchronized boolean beginFlush() { public boolean beginFlush() {
if (flushing()) { try {
log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " return beginFlush(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException | TimeoutException e) {
log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the "
+ "framework should not allow this"); + "framework should not allow this");
throw new ConnectException("OffsetStorageWriter is already flushing"); throw new ConnectException("OffsetStorageWriter is already flushing");
} }
}
if (data.isEmpty()) /**
return false; * Performs the first step of a flush operation, snapshotting the current state. This does not
* actually initiate the flush with the underlying storage. Ensures that any previous flush operations
toFlush = data; * have finished before beginning a new flush.
data = new HashMap<>(); * <p>If and only if this method returns true, the caller must call {@link #doFlush(Callback)}
return true; * or {@link #cancelFlush()} to finish the flush operation and allow later calls to complete.
*
* @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting
* @param timeUnit Units of the timeout argument
* @return true if a flush was initiated, false if no data was available
* @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete
* @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete.
*/
public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
synchronized (this) {
if (data.isEmpty()) {
flushInProgress.release();
return false;
} else {
toFlush = data;
data = new HashMap<>();
return true;
}
}
} else {
throw new TimeoutException("Timed out waiting for previous flush to finish");
}
} }
/** /**
@ -193,6 +224,7 @@ public class OffsetStorageWriter {
toFlush.putAll(data); toFlush.putAll(data);
data = toFlush; data = toFlush;
currentFlushId++; currentFlushId++;
flushInProgress.release();
toFlush = null; toFlush = null;
} }
} }
@ -211,6 +243,7 @@ public class OffsetStorageWriter {
cancelFlush(); cancelFlush();
} else { } else {
currentFlushId++; currentFlushId++;
flushInProgress.release();
toFlush = null; toFlush = null;
} }
return true; return true;

View File

@ -1335,6 +1335,53 @@ public class WorkerSinkTaskTest {
} }
} }
@Test
public void testTaskCancelPreventsFinalOffsetCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall();
// the second put will return after the task is stopped and cancelled (asynchronously)
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andAnswer(() -> {
workerTask.stop();
workerTask.cancel();
return null;
});
// stop wakes up the consumer
consumer.wakeup();
EasyMock.expectLastCall();
// task performs normal steps in advance of committing offsets
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.execute();
PowerMock.verifyAll();
}
// Verify that when commitAsync is called but the supplied callback is not called by the consumer before a // Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed offset from the rebalance. // rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
// See KAFKA-5731 for more information. // See KAFKA-5731 for more information.

View File

@ -412,7 +412,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.stop(); sourceTask.stop();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
expectOffsetFlush(true);
expectClose(); expectClose();
@ -1004,7 +1003,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void expectOffsetFlush(boolean succeed) throws Exception { private void expectOffsetFlush(boolean succeed) throws Exception {
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true);
Future<Void> flushFuture = PowerMock.createMock(Future.class); Future<Void> flushFuture = PowerMock.createMock(Future.class);
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
// Should throw for failure // Should throw for failure
@ -1021,6 +1020,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
} }
} }
private void expectEmptyOffsetFlush() throws Exception {
EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(false);
sourceTask.commit();
EasyMock.expectLastCall();
}
private void assertPollMetrics(int minimumPollCountExpected) { private void assertPollMetrics(int minimumPollCountExpected) {
MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup(); MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.connect.storage; package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.isNull;
@ -82,7 +82,7 @@ public class OffsetStorageWriterTest {
writer.offset(OFFSET_KEY, OFFSET_VALUE); writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull()); verify(callback).onCompletion(isNull(), isNull());
} }
@ -96,7 +96,7 @@ public class OffsetStorageWriterTest {
writer.offset(OFFSET_KEY, null); writer.offset(OFFSET_KEY, null);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull()); verify(callback).onCompletion(isNull(), isNull());
} }
@ -111,14 +111,15 @@ public class OffsetStorageWriterTest {
writer.offset(null, OFFSET_VALUE); writer.offset(null, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull()); verify(callback).onCompletion(isNull(), isNull());
} }
@Test @Test
public void testNoOffsetsToFlush() { public void testNoOffsetsToFlush() throws InterruptedException, TimeoutException {
assertFalse(writer.beginFlush()); assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
// If no offsets are flushed, we should finish immediately and not have made any calls to the // If no offsets are flushed, we should finish immediately and not have made any calls to the
// underlying storage layer // underlying storage layer
@ -135,22 +136,22 @@ public class OffsetStorageWriterTest {
// First time the write fails // First time the write fails
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null);
writer.offset(OFFSET_KEY, OFFSET_VALUE); writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(eq(EXCEPTION), isNull()); verify(callback).onCompletion(eq(EXCEPTION), isNull());
// Second time it succeeds // Second time it succeeds
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull()); verify(callback).onCompletion(isNull(), isNull());
// Third time it has no data to flush so we won't get past beginFlush() // Third time it has no data to flush so we won't get past beginFlush()
assertFalse(writer.beginFlush()); assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
} }
@Test @Test
public void testAlreadyFlushing() { public void testAlreadyFlushing() throws InterruptedException, TimeoutException {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Callback<Void> callback = mock(Callback.class); final Callback<Void> callback = mock(Callback.class);
// Trigger the send, but don't invoke the callback so we'll still be mid-flush // Trigger the send, but don't invoke the callback so we'll still be mid-flush
@ -158,15 +159,18 @@ public class OffsetStorageWriterTest {
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
writer.offset(OFFSET_KEY, OFFSET_VALUE); writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback); writer.doFlush(callback);
assertThrows(ConnectException.class, writer::beginFlush); assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
allowStoreCompleteCountdown.countDown();
assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
} }
@Test @Test
public void testCancelBeforeAwaitFlush() { public void testCancelBeforeAwaitFlush() throws InterruptedException, TimeoutException {
writer.offset(OFFSET_KEY, OFFSET_VALUE); writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.cancelFlush(); writer.cancelFlush();
} }
@ -180,7 +184,7 @@ public class OffsetStorageWriterTest {
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
writer.offset(OFFSET_KEY, OFFSET_VALUE); writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush()); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
// Start the flush, then immediately cancel before allowing the mocked store request to finish // Start the flush, then immediately cancel before allowing the mocked store request to finish
Future<Void> flushFuture = writer.doFlush(callback); Future<Void> flushFuture = writer.doFlush(callback);
writer.cancelFlush(); writer.cancelFlush();
@ -214,7 +218,7 @@ public class OffsetStorageWriterTest {
keySerialized == null ? null : ByteBuffer.wrap(keySerialized), keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized)); valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized));
when(store.set(eq(offsetsSerialized), storeCallback.capture())).thenAnswer(invocation -> { when(store.set(eq(offsetsSerialized), storeCallback.capture())).thenAnswer(invocation -> {
final Callback<Void> cb = storeCallback.getValue(); final Callback<Void> cb = invocation.getArgument(1);
return service.submit(() -> { return service.submit(() -> {
if (waitForCompletion != null) if (waitForCompletion != null)
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS)); assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));