diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 5c2bd3e39ca..78a61e755f4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -343,11 +344,16 @@ class WorkerSinkTask extends WorkerTask, SinkReco deliverMessages(); } - // Visible for testing + // VisibleForTesting boolean isCommitting() { return committing; } + //VisibleForTesting + Map lastCommittedOffsets() { + return Collections.unmodifiableMap(lastCommittedOffsets); + } + private void doCommitSync(Map offsets, int seqno) { log.debug("{} Committing offsets synchronously using sequence number {}: {}", this, seqno, offsets); try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java index b41f571d635..f39dce66460 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java @@ -20,7 +20,10 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -33,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; @@ -49,6 +53,7 @@ import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.junit.After; import org.junit.Before; @@ -67,19 +72,25 @@ import org.mockito.stubbing.OngoingStubbing; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Pattern; import static java.util.Arrays.asList; +import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; @@ -300,7 +311,7 @@ public class WorkerSinkTaskMockitoTest { verify(statusListener).onResume(taskId); verify(consumer, times(2)).wakeup(); INITIAL_ASSIGNMENT.forEach(tp -> { - verify(consumer).resume(Collections.singleton(tp)); + verify(consumer).resume(singleton(tp)); }); verify(sinkTask, times(4)).put(anyList()); } @@ -343,6 +354,99 @@ public class WorkerSinkTaskMockitoTest { verify(headerConverter).close(); } + @Test + public void testPollRedelivery() { + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime + .thenAnswer(expectConsumerPoll(1)) + // Retry delivery should succeed + .thenAnswer(expectConsumerPoll(0)) + .thenAnswer(expectConsumerPoll(0)); + expectConversionAndTransformation(null, new RecordHeaders()); + + doAnswer(invocation -> null) + .doThrow(new RetriableException("retry")) + .doAnswer(invocation -> null) + .when(sinkTask).put(anyList()); + + workerTask.iteration(); + time.sleep(10000L); + + verifyPollInitialAssignment(); + verify(sinkTask).put(anyList()); + + assertSinkMetricValue("partition-count", 2); + assertSinkMetricValue("sink-record-read-total", 0.0); + assertSinkMetricValue("sink-record-send-total", 0.0); + assertSinkMetricValue("sink-record-active-count", 0.0); + assertSinkMetricValue("sink-record-active-count-max", 0.0); + assertSinkMetricValue("sink-record-active-count-avg", 0.0); + assertSinkMetricValue("offset-commit-seq-no", 0.0); + assertSinkMetricValue("offset-commit-completion-rate", 0.0); + assertSinkMetricValue("offset-commit-completion-total", 0.0); + assertSinkMetricValue("offset-commit-skip-rate", 0.0); + assertSinkMetricValue("offset-commit-skip-total", 0.0); + assertTaskMetricValue("status", "running"); + assertTaskMetricValue("running-ratio", 1.0); + assertTaskMetricValue("pause-ratio", 0.0); + assertTaskMetricValue("batch-size-max", 0.0); + assertTaskMetricValue("batch-size-avg", 0.0); + assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN); + assertTaskMetricValue("offset-commit-failure-percentage", 0.0); + assertTaskMetricValue("offset-commit-success-percentage", 0.0); + + // Pause + workerTask.iteration(); + + verify(consumer, times(3)).assignment(); + verify(consumer).pause(INITIAL_ASSIGNMENT); + + // Retry delivery should succeed + workerTask.iteration(); + time.sleep(30000L); + + verify(sinkTask, times(3)).put(anyList()); + INITIAL_ASSIGNMENT.forEach(tp -> { + verify(consumer).resume(Collections.singleton(tp)); + }); + + assertSinkMetricValue("sink-record-read-total", 1.0); + assertSinkMetricValue("sink-record-send-total", 1.0); + assertSinkMetricValue("sink-record-active-count", 1.0); + assertSinkMetricValue("sink-record-active-count-max", 1.0); + assertSinkMetricValue("sink-record-active-count-avg", 0.5); + assertTaskMetricValue("status", "running"); + assertTaskMetricValue("running-ratio", 1.0); + assertTaskMetricValue("batch-size-max", 1.0); + assertTaskMetricValue("batch-size-avg", 0.5); + + // Expect commit + final Map workerCurrentOffsets = new HashMap<>(); + // Commit advance by one + workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + // Nothing polled for this partition + workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(workerCurrentOffsets)).thenReturn(workerCurrentOffsets); + + sinkTaskContext.getValue().requestCommit(); + time.sleep(10000L); + workerTask.iteration(); + + final ArgumentCaptor callback = ArgumentCaptor.forClass(OffsetCommitCallback.class); + verify(consumer).commitAsync(eq(workerCurrentOffsets), callback.capture()); + callback.getValue().onComplete(workerCurrentOffsets, null); + + verify(sinkTask, times(4)).put(anyList()); + assertSinkMetricValue("offset-commit-completion-total", 1.0); + } + @Test public void testErrorInRebalancePartitionLoss() { RuntimeException exception = new RuntimeException("Revocation error"); @@ -455,18 +559,18 @@ public class WorkerSinkTaskMockitoTest { return ConsumerRecords.empty(); }) .thenAnswer((Answer>) invocation -> { - rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsRevoked(singleton(TOPIC_PARTITION)); rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); return ConsumerRecords.empty(); }) .thenAnswer((Answer>) invocation -> { rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); - rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); + rebalanceListener.getValue().onPartitionsAssigned(singleton(TOPIC_PARTITION3)); return ConsumerRecords.empty(); }) .thenAnswer((Answer>) invocation -> { - rebalanceListener.getValue().onPartitionsLost(Collections.singleton(TOPIC_PARTITION3)); - rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsLost(singleton(TOPIC_PARTITION3)); + rebalanceListener.getValue().onPartitionsAssigned(singleton(TOPIC_PARTITION)); return ConsumerRecords.empty(); }); @@ -482,21 +586,188 @@ public class WorkerSinkTaskMockitoTest { // Second iteration--second call to poll, partial consumer revocation workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION)); verify(sinkTask, times(2)).put(Collections.emptyList()); // Third iteration--third call to poll, partial consumer assignment workerTask.iteration(); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION3)); verify(sinkTask, times(3)).put(Collections.emptyList()); // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3)); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION)); verify(sinkTask, times(4)).put(Collections.emptyList()); } + @SuppressWarnings("unchecked") + @Test + public void testTaskCancelPreventsFinalOffsetCommit() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectTaskGetTopic(); + expectPollInitialAssignment() + // Put one message through the task to get some offsets to commit + .thenAnswer(expectConsumerPoll(1)) + // the second put will return after the task is stopped and cancelled (asynchronously) + .thenAnswer(expectConsumerPoll(1)); + + expectConversionAndTransformation(null, new RecordHeaders()); + + doAnswer(invocation -> null) + .doAnswer(invocation -> null) + .doAnswer(invocation -> { + workerTask.stop(); + workerTask.cancel(); + return null; + }) + .when(sinkTask).put(anyList()); + + // task performs normal steps in advance of committing offsets + final Map offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(offsets)).thenReturn(offsets); + + workerTask.execute(); + + // stop wakes up the consumer + verify(consumer).wakeup(); + + verify(sinkTask).close(any()); + } + + @Test + public void testDeliveryWithMutatingTransform() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectTaskGetTopic(); + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(expectConsumerPoll(0)); + + expectConversionAndTransformation("newtopic_", new RecordHeaders()); + + workerTask.iteration(); // initial assignment + + workerTask.iteration(); // first record delivered + + final Map offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(offsets)).thenReturn(offsets); + + sinkTaskContext.getValue().requestCommit(); + assertTrue(sinkTaskContext.getValue().isCommitRequested()); + + assertNotEquals(offsets, workerTask.lastCommittedOffsets()); + workerTask.iteration(); // triggers the commit + + ArgumentCaptor callback = ArgumentCaptor.forClass(OffsetCommitCallback.class); + verify(consumer).commitAsync(eq(offsets), callback.capture()); + + callback.getValue().onComplete(offsets, null); + + assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared + assertEquals(offsets, workerTask.lastCommittedOffsets()); + assertEquals(0, workerTask.commitFailures()); + assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001); + } + + @Test + public void testMissingTimestampPropagation() { + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME, new RecordHeaders())); + + expectConversionAndTransformation(null, new RecordHeaders()); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + SinkRecord record = records.getValue().iterator().next(); + + // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API + assertNull(record.timestamp()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + } + + @Test + public void testTimestampPropagation() { + final Long timestamp = System.currentTimeMillis(); + final TimestampType timestampType = TimestampType.CREATE_TIME; + + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, timestamp, timestampType, new RecordHeaders())); + + expectConversionAndTransformation(null, new RecordHeaders()); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + SinkRecord record = records.getValue().iterator().next(); + + assertEquals(timestamp, record.timestamp()); + assertEquals(timestampType, record.timestampType()); + } + + @Test + public void testTopicsRegex() { + Map props = new HashMap<>(TASK_PROPS); + props.remove("topics"); + props.put("topics.regex", "te.*"); + TaskConfig taskConfig = new TaskConfig(props); + + createTask(TargetState.PAUSED); + + workerTask.initialize(taskConfig); + workerTask.initializeAndStart(); + + ArgumentCaptor topicsRegex = ArgumentCaptor.forClass(Pattern.class); + + verify(consumer).subscribe(topicsRegex.capture(), rebalanceListener.capture()); + assertEquals("te.*", topicsRegex.getValue().pattern()); + verify(sinkTask).initialize(sinkTaskContext.capture()); + verify(sinkTask).start(props); + + expectPollInitialAssignment(); + + workerTask.iteration(); + time.sleep(10000L); + + verify(consumer).pause(INITIAL_ASSIGNMENT); + } + @Test public void testMetricsGroup() { SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics); @@ -558,6 +829,143 @@ public class WorkerSinkTaskMockitoTest { assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d); } + @Test + public void testHeaders() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + Headers headers = new RecordHeaders(); + headers.add("header_key", "header_value".getBytes()); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, headers)); + + expectConversionAndTransformation(null, headers); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor> recordCapture = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(recordCapture.capture()); + + assertEquals(1, recordCapture.getValue().size()); + SinkRecord record = recordCapture.getValue().iterator().next(); + + assertEquals("header_value", record.headers().lastWithName("header_key").value()); + } + + @Test + public void testHeadersWithCustomConverter() { + StringConverter stringConverter = new StringConverter(); + SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); + + createTask(initialState, stringConverter, testConverter, stringConverter); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + String keyA = "a"; + String valueA = "Árvíztűrő tükörfúrógép"; + Headers headersA = new RecordHeaders(); + String encodingA = "latin2"; + headersA.add("encoding", encodingA.getBytes()); + + String keyB = "b"; + String valueB = "Тестовое сообщение"; + Headers headersB = new RecordHeaders(); + String encodingB = "koi8_r"; + headersB.add("encoding", encodingB.getBytes()); + + expectPollInitialAssignment() + .thenAnswer((Answer>) invocation -> { + List> records = Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()), + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty()) + ); + return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)); + }); + + expectTransformation(null); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver records + + @SuppressWarnings("unchecked") + ArgumentCaptor> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + Iterator iterator = records.getValue().iterator(); + + SinkRecord recordA = iterator.next(); + assertEquals(keyA, recordA.key()); + assertEquals(valueA, recordA.value()); + + SinkRecord recordB = iterator.next(); + assertEquals(keyB, recordB.key()); + assertEquals(valueB, recordB.value()); + } + + @Test + public void testOriginalTopicWithTopicMutatingTransformations() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)); + + expectConversionAndTransformation("newtopic_", new RecordHeaders()); + + workerTask.iteration(); // initial assignment + workerTask.iteration(); // first record delivered + + @SuppressWarnings("unchecked") + ArgumentCaptor> recordCapture = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(recordCapture.capture()); + + assertEquals(1, recordCapture.getValue().size()); + SinkRecord record = recordCapture.getValue().iterator().next(); + assertEquals(TOPIC, record.originalTopic()); + assertEquals("newtopic_" + TOPIC, record.topic()); + } + + @Test + public void testPartitionCountInCaseOfPartitionRevocation() { + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + // Setting up Worker Sink Task to check metrics + workerTask = new WorkerSinkTask( + taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, + keyConverter, valueConverter, errorHandlingMetrics, headerConverter, + transformationChain, mockConsumer, pluginLoader, time, + RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList); + mockConsumer.updateBeginningOffsets( + new HashMap() {{ + put(TOPIC_PARTITION, 0L); + put(TOPIC_PARTITION2, 0L); + }} + ); + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + // Initial Re-balance to assign INITIAL_ASSIGNMENT which is "TOPIC_PARTITION" and "TOPIC_PARTITION2" + mockConsumer.rebalance(INITIAL_ASSIGNMENT); + assertSinkMetricValue("partition-count", 2); + // Revoked "TOPIC_PARTITION" and second re-balance with "TOPIC_PARTITION2" + mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2)); + assertSinkMetricValue("partition-count", 1); + // Closing the Worker Sink Task which will update the partition count as 0. + workerTask.close(); + assertSinkMetricValue("partition-count", 0); + } + private void expectRebalanceRevocationError(RuntimeException e) { when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap()); doThrow(e).when(sinkTask).close(INITIAL_ASSIGNMENT); @@ -599,6 +1007,10 @@ public class WorkerSinkTaskMockitoTest { return expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, new RecordHeaders()); } + private Answer> expectConsumerPoll(final int numMessages, Headers headers) { + return expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers); + } + private Answer> expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) { return invocation -> { List> records = new ArrayList<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 66edf6a0717..e103c30157b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; -import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; @@ -50,13 +49,10 @@ import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; -import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; -import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.IExpectationSetters; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -71,11 +67,9 @@ import org.powermock.reflect.Whitebox; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -88,15 +82,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -134,7 +125,6 @@ public class WorkerSinkTaskTest { private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1); private TargetState initialState = TargetState.STARTED; private MockTime time; private WorkerSinkTask workerTask; @@ -162,7 +152,6 @@ public class WorkerSinkTaskTest { @Mock private ErrorHandlingMetrics errorHandlingMetrics; private Capture rebalanceListener = EasyMock.newCapture(); - private Capture topicsRegex = EasyMock.newCapture(); private long recordsReturnedTp1; private long recordsReturnedTp3; @@ -203,103 +192,6 @@ public class WorkerSinkTaskTest { if (metrics != null) metrics.stop(); } - @Test - public void testPollRedelivery() throws Exception { - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - - // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime - expectConsumerPoll(1); - expectConversionAndTransformation(1); - Capture> records = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(records)); - EasyMock.expectLastCall().andThrow(new RetriableException("retry")); - // Pause - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT); - consumer.pause(INITIAL_ASSIGNMENT); - PowerMock.expectLastCall(); - - // Retry delivery should succeed - expectConsumerPoll(0); - sinkTask.put(EasyMock.capture(records)); - EasyMock.expectLastCall(); - // And unpause - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT); - INITIAL_ASSIGNMENT.forEach(tp -> { - consumer.resume(singleton(tp)); - PowerMock.expectLastCall(); - }); - - // Expect commit - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); - final Map workerCurrentOffsets = new HashMap<>(); - // Commit advance by one - workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); - // Nothing polled for this partition - workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); - EasyMock.expect(sinkTask.preCommit(workerCurrentOffsets)).andReturn(workerCurrentOffsets); - final Capture callback = EasyMock.newCapture(); - consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.capture(callback)); - EasyMock.expectLastCall().andAnswer(() -> { - callback.getValue().onComplete(workerCurrentOffsets, null); - return null; - }); - expectConsumerPoll(0); - sinkTask.put(EasyMock.eq(Collections.emptyList())); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.iteration(); - time.sleep(10000L); - - assertSinkMetricValue("partition-count", 2); - assertSinkMetricValue("sink-record-read-total", 0.0); - assertSinkMetricValue("sink-record-send-total", 0.0); - assertSinkMetricValue("sink-record-active-count", 0.0); - assertSinkMetricValue("sink-record-active-count-max", 0.0); - assertSinkMetricValue("sink-record-active-count-avg", 0.0); - assertSinkMetricValue("offset-commit-seq-no", 0.0); - assertSinkMetricValue("offset-commit-completion-rate", 0.0); - assertSinkMetricValue("offset-commit-completion-total", 0.0); - assertSinkMetricValue("offset-commit-skip-rate", 0.0); - assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status", "running"); - assertTaskMetricValue("running-ratio", 1.0); - assertTaskMetricValue("pause-ratio", 0.0); - assertTaskMetricValue("batch-size-max", 0.0); - assertTaskMetricValue("batch-size-avg", 0.0); - assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN); - assertTaskMetricValue("offset-commit-failure-percentage", 0.0); - assertTaskMetricValue("offset-commit-success-percentage", 0.0); - - workerTask.iteration(); - workerTask.iteration(); - time.sleep(30000L); - - assertSinkMetricValue("sink-record-read-total", 1.0); - assertSinkMetricValue("sink-record-send-total", 1.0); - assertSinkMetricValue("sink-record-active-count", 1.0); - assertSinkMetricValue("sink-record-active-count-max", 1.0); - assertSinkMetricValue("sink-record-active-count-avg", 0.5); - assertTaskMetricValue("status", "running"); - assertTaskMetricValue("running-ratio", 1.0); - assertTaskMetricValue("batch-size-max", 1.0); - assertTaskMetricValue("batch-size-avg", 0.5); - - sinkTaskContext.getValue().requestCommit(); - time.sleep(10000L); - workerTask.iteration(); - assertSinkMetricValue("offset-commit-completion-total", 1.0); - - PowerMock.verifyAll(); - } - @Test public void testPollRedeliveryWithConsumerRebalance() throws Exception { createTask(initialState); @@ -1054,53 +946,6 @@ public class WorkerSinkTaskTest { } } - @Test - public void testTaskCancelPreventsFinalOffsetCommit() throws Exception { - createTask(initialState); - expectInitializeTask(); - expectTaskGetTopic(true); - - expectPollInitialAssignment(); - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(1); - - // Put one message through the task to get some offsets to commit - expectConsumerPoll(1); - expectConversionAndTransformation(1); - sinkTask.put(EasyMock.anyObject()); - PowerMock.expectLastCall(); - - // the second put will return after the task is stopped and cancelled (asynchronously) - expectConsumerPoll(1); - expectConversionAndTransformation(1); - sinkTask.put(EasyMock.anyObject()); - PowerMock.expectLastCall().andAnswer(() -> { - workerTask.stop(); - workerTask.cancel(); - return null; - }); - - // stop wakes up the consumer - consumer.wakeup(); - EasyMock.expectLastCall(); - - // task performs normal steps in advance of committing offsets - final Map offsets = new HashMap<>(); - offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); - offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); - sinkTask.preCommit(offsets); - EasyMock.expectLastCall().andReturn(offsets); - sinkTask.close(EasyMock.anyObject()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.execute(); - - PowerMock.verifyAll(); - } - // Verify that when commitAsync is called but the supplied callback is not called by the consumer before a // rebalance occurs, the async callback does not reset the last committed offset from the rebalance. // See KAFKA-5731 for more information. @@ -1318,294 +1163,6 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } - @Test - public void testDeliveryWithMutatingTransform() throws Exception { - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - - expectPollInitialAssignment(); - - expectConsumerPoll(1); - expectConversionAndTransformation(1, "newtopic_"); - sinkTask.put(EasyMock.anyObject()); - EasyMock.expectLastCall(); - - final Map offsets = new HashMap<>(); - offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); - offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); - sinkTask.preCommit(offsets); - EasyMock.expectLastCall().andReturn(offsets); - - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); - - final Capture callback = EasyMock.newCapture(); - consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback)); - EasyMock.expectLastCall().andAnswer(() -> { - callback.getValue().onComplete(offsets, null); - return null; - }); - - expectConsumerPoll(0); - sinkTask.put(Collections.emptyList()); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - - workerTask.iteration(); // initial assignment - - workerTask.iteration(); // first record delivered - - sinkTaskContext.getValue().requestCommit(); - assertTrue(sinkTaskContext.getValue().isCommitRequested()); - assertNotEquals(offsets, Whitebox.>getInternalState(workerTask, "lastCommittedOffsets")); - workerTask.iteration(); // triggers the commit - assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared - assertEquals(offsets, Whitebox.>getInternalState(workerTask, "lastCommittedOffsets")); - assertEquals(0, workerTask.commitFailures()); - assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001); - - PowerMock.verifyAll(); - } - - @Test - public void testMissingTimestampPropagation() throws Exception { - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME); - expectConversionAndTransformation(1); - - Capture> records = EasyMock.newCapture(CaptureType.ALL); - - sinkTask.put(EasyMock.capture(records)); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.iteration(); // iter 1 -- initial assignment - workerTask.iteration(); // iter 2 -- deliver 1 record - - SinkRecord record = records.getValue().iterator().next(); - - // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API - assertNull(record.timestamp()); - assertEquals(TimestampType.CREATE_TIME, record.timestampType()); - - PowerMock.verifyAll(); - } - - @Test - public void testTimestampPropagation() throws Exception { - final Long timestamp = System.currentTimeMillis(); - final TimestampType timestampType = TimestampType.CREATE_TIME; - - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - expectConsumerPoll(1, timestamp, timestampType); - expectConversionAndTransformation(1); - - Capture> records = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(records)); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.iteration(); // iter 1 -- initial assignment - workerTask.iteration(); // iter 2 -- deliver 1 record - - SinkRecord record = records.getValue().iterator().next(); - - assertEquals(timestamp, record.timestamp()); - assertEquals(timestampType, record.timestampType()); - - PowerMock.verifyAll(); - } - - @Test - public void testTopicsRegex() { - Map props = new HashMap<>(TASK_PROPS); - props.remove("topics"); - props.put("topics.regex", "te.*"); - TaskConfig taskConfig = new TaskConfig(props); - - createTask(TargetState.PAUSED); - - consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener)); - PowerMock.expectLastCall(); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(props); - PowerMock.expectLastCall(); - - expectPollInitialAssignment(); - - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT); - consumer.pause(INITIAL_ASSIGNMENT); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(taskConfig); - workerTask.initializeAndStart(); - workerTask.iteration(); - time.sleep(10000L); - - PowerMock.verifyAll(); - } - - @Test - public void testHeaders() throws Exception { - Headers headers = new RecordHeaders(); - headers.add("header_key", "header_value".getBytes()); - - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - - expectConsumerPoll(1, headers); - expectConversionAndTransformation(1, null, headers); - sinkTask.put(EasyMock.anyObject()); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.iteration(); // iter 1 -- initial assignment - workerTask.iteration(); // iter 2 -- deliver 1 record - - PowerMock.verifyAll(); - } - - @Test - public void testHeadersWithCustomConverter() throws Exception { - StringConverter stringConverter = new StringConverter(); - SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); - - createTask(initialState, stringConverter, testConverter, stringConverter); - - expectInitializeTask(); - expectTaskGetTopic(true); - expectPollInitialAssignment(); - - String keyA = "a"; - String valueA = "Árvíztűrő tükörfúrógép"; - Headers headersA = new RecordHeaders(); - String encodingA = "latin2"; - headersA.add("encoding", encodingA.getBytes()); - - String keyB = "b"; - String valueB = "Тестовое сообщение"; - Headers headersB = new RecordHeaders(); - String encodingB = "koi8_r"; - headersB.add("encoding", encodingB.getBytes()); - - expectConsumerPoll(Arrays.asList( - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, - 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()), - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, - 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty()) - )); - - expectTransformation(2, null); - - Capture> records = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(records)); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - workerTask.iteration(); // iter 1 -- initial assignment - workerTask.iteration(); // iter 2 -- deliver 1 record - - Iterator iterator = records.getValue().iterator(); - - SinkRecord recordA = iterator.next(); - assertEquals(keyA, recordA.key()); - assertEquals(valueA, recordA.value()); - - SinkRecord recordB = iterator.next(); - assertEquals(keyB, recordB.key()); - assertEquals(valueB, recordB.value()); - - PowerMock.verifyAll(); - } - - @Test - public void testOriginalTopicWithTopicMutatingTransformations() { - createTask(initialState); - - expectInitializeTask(); - expectTaskGetTopic(true); - - expectPollInitialAssignment(); - - expectConsumerPoll(1); - expectConversionAndTransformation(1, "newtopic_"); - Capture> recordCapture = EasyMock.newCapture(); - sinkTask.put(EasyMock.capture(recordCapture)); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - - workerTask.iteration(); // initial assignment - - workerTask.iteration(); // first record delivered - - assertTrue(recordCapture.hasCaptured()); - assertEquals(1, recordCapture.getValue().size()); - SinkRecord record = recordCapture.getValue().iterator().next(); - assertEquals(TOPIC, record.originalTopic()); - assertEquals("newtopic_" + TOPIC, record.topic()); - - PowerMock.verifyAll(); - } - - @Test - public void testPartitionCountInCaseOfPartitionRevocation() { - MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - // Setting up Worker Sink Task to check metrics - workerTask = new WorkerSinkTask( - taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, - keyConverter, valueConverter, errorHandlingMetrics, headerConverter, - transformationChain, mockConsumer, pluginLoader, time, - RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList); - mockConsumer.updateBeginningOffsets(new HashMap() {{ - put(TOPIC_PARTITION, 0 * 1L); - put(TOPIC_PARTITION2, 0 * 1L); - }}); - workerTask.initialize(TASK_CONFIG); - workerTask.initializeAndStart(); - // Initial Re-balance to assign INITIAL_ASSIGNMENT which is "TOPIC_PARTITION" and "TOPIC_PARTITION2" - mockConsumer.rebalance(INITIAL_ASSIGNMENT); - assertSinkMetricValue("partition-count", 2); - // Revoked "TOPIC_PARTITION" and second re-balance with "TOPIC_PARTITION2" - mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2)); - assertSinkMetricValue("partition-count", 1); - // Closing the Worker Sink Task which will update the partition count as 0. - workerTask.close(); - assertSinkMetricValue("partition-count", 0); - } - private void expectInitializeTask() { consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -1636,14 +1193,6 @@ public class WorkerSinkTaskTest { expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, emptyHeaders()); } - private void expectConsumerPoll(final int numMessages, Headers headers) { - expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers); - } - - private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) { - expectConsumerPoll(numMessages, timestamp, timestampType, emptyHeaders()); - } - private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) { EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( () -> { @@ -1660,15 +1209,6 @@ public class WorkerSinkTaskTest { }); } - private void expectConsumerPoll(List> records) { - EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( - () -> new ConsumerRecords<>( - records.isEmpty() ? - Collections.emptyMap() : - Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) - )); - } - private void expectConversionAndTransformation(final int numMessages) { expectConversionAndTransformation(numMessages, null); }