diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7c32223961b..659613d80e0 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -155,7 +155,7 @@ files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> + files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/> flushError = new AtomicReference<>(); - if (offsetWriter.beginFlush()) { + boolean shouldFlush = false; + try { + // Begin the flush without waiting, as there should not be any concurrent flushes. + // This is because commitTransaction is always called on the same thread, and should always block until + // the flush is complete, or cancel the flush if an error occurs. + shouldFlush = offsetWriter.beginFlush(); + } catch (Throwable e) { + flushError.compareAndSet(null, e); + } + if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time // Producer::commitTransaction completes diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 01303e308d7..3fbdc339f0f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -363,6 +363,10 @@ class WorkerSinkTask extends WorkerTask { * the write commit. **/ private void doCommit(Map offsets, boolean closing, int seqno) { + if (isCancelled()) { + log.debug("Skipping final offset commit as task has been cancelled"); + return; + } if (closing) { doCommitSync(offsets, seqno); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 37d93a3fe86..a8fc73d7bba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -239,7 +239,19 @@ class WorkerSourceTask extends AbstractWorkerSourceTask { // though we may update them here with newer offsets for acked records. offsetsToCommit.offsets().forEach(offsetWriter::offset); - if (!offsetWriter.beginFlush()) { + boolean shouldFlush; + try { + shouldFlush = offsetWriter.beginFlush(timeout - time.milliseconds(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("{} Interrupted while waiting for previous offset flush to complete, cancelling", this); + recordCommitFailure(time.milliseconds() - started, e); + return false; + } catch (TimeoutException e) { + log.warn("{} Timed out while waiting for previous offset flush to complete, cancelling", this); + recordCommitFailure(time.milliseconds() - started, e); + return false; + } + if (!shouldFlush) { // There was nothing in the offsets to process, but we still mark a successful offset commit. long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index b67e3d7b1b4..692669e7544 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -26,6 +26,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** *

@@ -73,6 +76,7 @@ public class OffsetStorageWriter { private Map, Map> data = new HashMap<>(); private Map, Map> toFlush = null; + private final Semaphore flushInProgress = new Semaphore(1); // Unique ID for each flush request to handle callbacks after timeouts private long currentFlushId = 0; @@ -100,23 +104,50 @@ public class OffsetStorageWriter { /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * * @return true if a flush was initiated, false if no data was available + * @throws ConnectException if the previous flush is not complete before this method is called */ - public synchronized boolean beginFlush() { - if (flushing()) { - log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " + public boolean beginFlush() { + try { + return beginFlush(0, TimeUnit.NANOSECONDS); + } catch (InterruptedException | TimeoutException e) { + log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the " + "framework should not allow this"); throw new ConnectException("OffsetStorageWriter is already flushing"); } + } - if (data.isEmpty()) - return false; - - toFlush = data; - data = new HashMap<>(); - return true; + /** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. + *

If and only if this method returns true, the caller must call {@link #doFlush(Callback)} + * or {@link #cancelFlush()} to finish the flush operation and allow later calls to complete. + * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument + * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete. + */ + public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) { + synchronized (this) { + if (data.isEmpty()) { + flushInProgress.release(); + return false; + } else { + toFlush = data; + data = new HashMap<>(); + return true; + } + } + } else { + throw new TimeoutException("Timed out waiting for previous flush to finish"); + } } /** @@ -193,6 +224,7 @@ public class OffsetStorageWriter { toFlush.putAll(data); data = toFlush; currentFlushId++; + flushInProgress.release(); toFlush = null; } } @@ -211,6 +243,7 @@ public class OffsetStorageWriter { cancelFlush(); } else { currentFlushId++; + flushInProgress.release(); toFlush = null; } return true; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 41d4de5e0a8..33424776cd8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -1335,6 +1335,53 @@ public class WorkerSinkTaskTest { } } + @Test + public void testTaskCancelPreventsFinalOffsetCommit() throws Exception { + createTask(initialState); + expectInitializeTask(); + expectTaskGetTopic(true); + + expectPollInitialAssignment(); + EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); + + // Put one message through the task to get some offsets to commit + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall(); + + // the second put will return after the task is stopped and cancelled (asynchronously) + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall().andAnswer(() -> { + workerTask.stop(); + workerTask.cancel(); + return null; + }); + + // stop wakes up the consumer + consumer.wakeup(); + EasyMock.expectLastCall(); + + // task performs normal steps in advance of committing offsets + final Map offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(offsets); + EasyMock.expectLastCall().andReturn(offsets); + sinkTask.close(EasyMock.anyObject()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + workerTask.execute(); + + PowerMock.verifyAll(); + } + // Verify that when commitAsync is called but the supplied callback is not called by the consumer before a // rebalance occurs, the async callback does not reset the last committed offset from the rebalance. // See KAFKA-5731 for more information. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 2d2cd00cf53..ea81cd62a13 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -412,7 +412,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { sourceTask.stop(); EasyMock.expectLastCall(); - expectOffsetFlush(true); expectClose(); @@ -1004,7 +1003,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @SuppressWarnings("unchecked") private void expectOffsetFlush(boolean succeed) throws Exception { - EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); + EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); Future flushFuture = PowerMock.createMock(Future.class); EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); // Should throw for failure @@ -1021,6 +1020,12 @@ public class WorkerSourceTaskTest extends ThreadedTest { } } + private void expectEmptyOffsetFlush() throws Exception { + EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(false); + sourceTask.commit(); + EasyMock.expectLastCall(); + } + private void assertPollMetrics(int minimumPollCountExpected) { MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java index b6eb0f6a487..8fa34ac95a3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.storage; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.util.Callback; import org.junit.After; import org.junit.Before; @@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -82,7 +82,7 @@ public class OffsetStorageWriterTest { writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); } @@ -96,7 +96,7 @@ public class OffsetStorageWriterTest { writer.offset(OFFSET_KEY, null); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); } @@ -111,14 +111,15 @@ public class OffsetStorageWriterTest { writer.offset(null, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); } @Test - public void testNoOffsetsToFlush() { - assertFalse(writer.beginFlush()); + public void testNoOffsetsToFlush() throws InterruptedException, TimeoutException { + assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); + assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); // If no offsets are flushed, we should finish immediately and not have made any calls to the // underlying storage layer @@ -135,22 +136,22 @@ public class OffsetStorageWriterTest { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() - assertFalse(writer.beginFlush()); + assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test - public void testAlreadyFlushing() { + public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush @@ -158,15 +159,18 @@ public class OffsetStorageWriterTest { expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); + assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback); - assertThrows(ConnectException.class, writer::beginFlush); + assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); + allowStoreCompleteCountdown.countDown(); + assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test - public void testCancelBeforeAwaitFlush() { + public void testCancelBeforeAwaitFlush() throws InterruptedException, TimeoutException { writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.cancelFlush(); } @@ -180,7 +184,7 @@ public class OffsetStorageWriterTest { expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); // Start the flush, then immediately cancel before allowing the mocked store request to finish Future flushFuture = writer.doFlush(callback); writer.cancelFlush(); @@ -214,7 +218,7 @@ public class OffsetStorageWriterTest { keySerialized == null ? null : ByteBuffer.wrap(keySerialized), valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized)); when(store.set(eq(offsetsSerialized), storeCallback.capture())).thenAnswer(invocation -> { - final Callback cb = storeCallback.getValue(); + final Callback cb = invocation.getArgument(1); return service.submit(() -> { if (waitForCompletion != null) assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));