KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (2/3) (#15313)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Hector Geraldino 2024-02-12 14:15:27 -05:00 committed by GitHub
parent c6f4c604d8
commit 794c52802c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 428 additions and 470 deletions

View File

@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -343,11 +344,16 @@ class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
deliverMessages();
}
// Visible for testing
// VisibleForTesting
boolean isCommitting() {
return committing;
}
//VisibleForTesting
Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets() {
return Collections.unmodifiableMap(lastCommittedOffsets);
}
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
log.debug("{} Committing offsets synchronously using sequence number {}: {}", this, seqno, offsets);
try {

View File

@ -20,7 +20,10 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
@ -33,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
@ -49,6 +53,7 @@ import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.After;
import org.junit.Before;
@ -67,19 +72,25 @@ import org.mockito.stubbing.OngoingStubbing;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
@ -300,7 +311,7 @@ public class WorkerSinkTaskMockitoTest {
verify(statusListener).onResume(taskId);
verify(consumer, times(2)).wakeup();
INITIAL_ASSIGNMENT.forEach(tp -> {
verify(consumer).resume(Collections.singleton(tp));
verify(consumer).resume(singleton(tp));
});
verify(sinkTask, times(4)).put(anyList());
}
@ -343,6 +354,99 @@ public class WorkerSinkTaskMockitoTest {
verify(headerConverter).close();
}
@Test
public void testPollRedelivery() {
createTask(initialState);
expectTaskGetTopic();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
.thenAnswer(expectConsumerPoll(1))
// Retry delivery should succeed
.thenAnswer(expectConsumerPoll(0))
.thenAnswer(expectConsumerPoll(0));
expectConversionAndTransformation(null, new RecordHeaders());
doAnswer(invocation -> null)
.doThrow(new RetriableException("retry"))
.doAnswer(invocation -> null)
.when(sinkTask).put(anyList());
workerTask.iteration();
time.sleep(10000L);
verifyPollInitialAssignment();
verify(sinkTask).put(anyList());
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 0.0);
assertSinkMetricValue("sink-record-send-total", 0.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 0.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.0);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 0.0);
assertTaskMetricValue("batch-size-avg", 0.0);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
// Pause
workerTask.iteration();
verify(consumer, times(3)).assignment();
verify(consumer).pause(INITIAL_ASSIGNMENT);
// Retry delivery should succeed
workerTask.iteration();
time.sleep(30000L);
verify(sinkTask, times(3)).put(anyList());
INITIAL_ASSIGNMENT.forEach(tp -> {
verify(consumer).resume(Collections.singleton(tp));
});
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.5);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
// Expect commit
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
// Commit advance by one
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
// Nothing polled for this partition
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
when(sinkTask.preCommit(workerCurrentOffsets)).thenReturn(workerCurrentOffsets);
sinkTaskContext.getValue().requestCommit();
time.sleep(10000L);
workerTask.iteration();
final ArgumentCaptor<OffsetCommitCallback> callback = ArgumentCaptor.forClass(OffsetCommitCallback.class);
verify(consumer).commitAsync(eq(workerCurrentOffsets), callback.capture());
callback.getValue().onComplete(workerCurrentOffsets, null);
verify(sinkTask, times(4)).put(anyList());
assertSinkMetricValue("offset-commit-completion-total", 1.0);
}
@Test
public void testErrorInRebalancePartitionLoss() {
RuntimeException exception = new RuntimeException("Revocation error");
@ -455,18 +559,18 @@ public class WorkerSinkTaskMockitoTest {
return ConsumerRecords.empty();
})
.thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
rebalanceListener.getValue().onPartitionsRevoked(singleton(TOPIC_PARTITION));
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
return ConsumerRecords.empty();
})
.thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
rebalanceListener.getValue().onPartitionsAssigned(singleton(TOPIC_PARTITION3));
return ConsumerRecords.empty();
})
.thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
rebalanceListener.getValue().onPartitionsLost(Collections.singleton(TOPIC_PARTITION3));
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
rebalanceListener.getValue().onPartitionsLost(singleton(TOPIC_PARTITION3));
rebalanceListener.getValue().onPartitionsAssigned(singleton(TOPIC_PARTITION));
return ConsumerRecords.empty();
});
@ -482,21 +586,188 @@ public class WorkerSinkTaskMockitoTest {
// Second iteration--second call to poll, partial consumer revocation
workerTask.iteration();
verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
verify(sinkTask).close(singleton(TOPIC_PARTITION));
verify(sinkTask, times(2)).put(Collections.emptyList());
// Third iteration--third call to poll, partial consumer assignment
workerTask.iteration();
verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
verify(sinkTask).open(singleton(TOPIC_PARTITION3));
verify(sinkTask, times(3)).put(Collections.emptyList());
// Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned
workerTask.iteration();
verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3));
verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION));
verify(sinkTask).close(singleton(TOPIC_PARTITION3));
verify(sinkTask).open(singleton(TOPIC_PARTITION));
verify(sinkTask, times(4)).put(Collections.emptyList());
}
@SuppressWarnings("unchecked")
@Test
public void testTaskCancelPreventsFinalOffsetCommit() {
createTask(initialState);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectTaskGetTopic();
expectPollInitialAssignment()
// Put one message through the task to get some offsets to commit
.thenAnswer(expectConsumerPoll(1))
// the second put will return after the task is stopped and cancelled (asynchronously)
.thenAnswer(expectConsumerPoll(1));
expectConversionAndTransformation(null, new RecordHeaders());
doAnswer(invocation -> null)
.doAnswer(invocation -> null)
.doAnswer(invocation -> {
workerTask.stop();
workerTask.cancel();
return null;
})
.when(sinkTask).put(anyList());
// task performs normal steps in advance of committing offsets
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
when(sinkTask.preCommit(offsets)).thenReturn(offsets);
workerTask.execute();
// stop wakes up the consumer
verify(consumer).wakeup();
verify(sinkTask).close(any());
}
@Test
public void testDeliveryWithMutatingTransform() {
createTask(initialState);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectTaskGetTopic();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1))
.thenAnswer(expectConsumerPoll(0));
expectConversionAndTransformation("newtopic_", new RecordHeaders());
workerTask.iteration(); // initial assignment
workerTask.iteration(); // first record delivered
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
when(sinkTask.preCommit(offsets)).thenReturn(offsets);
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, workerTask.lastCommittedOffsets());
workerTask.iteration(); // triggers the commit
ArgumentCaptor<OffsetCommitCallback> callback = ArgumentCaptor.forClass(OffsetCommitCallback.class);
verify(consumer).commitAsync(eq(offsets), callback.capture());
callback.getValue().onComplete(offsets, null);
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, workerTask.lastCommittedOffsets());
assertEquals(0, workerTask.commitFailures());
assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
}
@Test
public void testMissingTimestampPropagation() {
createTask(initialState);
expectTaskGetTopic();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME, new RecordHeaders()));
expectConversionAndTransformation(null, new RecordHeaders());
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class);
verify(sinkTask, times(2)).put(records.capture());
SinkRecord record = records.getValue().iterator().next();
// we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
assertNull(record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
}
@Test
public void testTimestampPropagation() {
final Long timestamp = System.currentTimeMillis();
final TimestampType timestampType = TimestampType.CREATE_TIME;
createTask(initialState);
expectTaskGetTopic();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1, timestamp, timestampType, new RecordHeaders()));
expectConversionAndTransformation(null, new RecordHeaders());
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class);
verify(sinkTask, times(2)).put(records.capture());
SinkRecord record = records.getValue().iterator().next();
assertEquals(timestamp, record.timestamp());
assertEquals(timestampType, record.timestampType());
}
@Test
public void testTopicsRegex() {
Map<String, String> props = new HashMap<>(TASK_PROPS);
props.remove("topics");
props.put("topics.regex", "te.*");
TaskConfig taskConfig = new TaskConfig(props);
createTask(TargetState.PAUSED);
workerTask.initialize(taskConfig);
workerTask.initializeAndStart();
ArgumentCaptor<Pattern> topicsRegex = ArgumentCaptor.forClass(Pattern.class);
verify(consumer).subscribe(topicsRegex.capture(), rebalanceListener.capture());
assertEquals("te.*", topicsRegex.getValue().pattern());
verify(sinkTask).initialize(sinkTaskContext.capture());
verify(sinkTask).start(props);
expectPollInitialAssignment();
workerTask.iteration();
time.sleep(10000L);
verify(consumer).pause(INITIAL_ASSIGNMENT);
}
@Test
public void testMetricsGroup() {
SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
@ -558,6 +829,143 @@ public class WorkerSinkTaskMockitoTest {
assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
}
@Test
public void testHeaders() {
createTask(initialState);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1, headers));
expectConversionAndTransformation(null, headers);
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<SinkRecord>> recordCapture = ArgumentCaptor.forClass(Collection.class);
verify(sinkTask, times(2)).put(recordCapture.capture());
assertEquals(1, recordCapture.getValue().size());
SinkRecord record = recordCapture.getValue().iterator().next();
assertEquals("header_value", record.headers().lastWithName("header_key").value());
}
@Test
public void testHeadersWithCustomConverter() {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
createTask(initialState, stringConverter, testConverter, stringConverter);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
String keyA = "a";
String valueA = "Árvíztűrő tükörfúrógép";
Headers headersA = new RecordHeaders();
String encodingA = "latin2";
headersA.add("encoding", encodingA.getBytes());
String keyB = "b";
String valueB = "Тестовое сообщение";
Headers headersB = new RecordHeaders();
String encodingB = "koi8_r";
headersB.add("encoding", encodingB.getBytes());
expectPollInitialAssignment()
.thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()),
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty())
);
return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records));
});
expectTransformation(null);
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver records
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class);
verify(sinkTask, times(2)).put(records.capture());
Iterator<SinkRecord> iterator = records.getValue().iterator();
SinkRecord recordA = iterator.next();
assertEquals(keyA, recordA.key());
assertEquals(valueA, recordA.value());
SinkRecord recordB = iterator.next();
assertEquals(keyB, recordB.key());
assertEquals(valueB, recordB.value());
}
@Test
public void testOriginalTopicWithTopicMutatingTransformations() {
createTask(initialState);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1));
expectConversionAndTransformation("newtopic_", new RecordHeaders());
workerTask.iteration(); // initial assignment
workerTask.iteration(); // first record delivered
@SuppressWarnings("unchecked")
ArgumentCaptor<Collection<SinkRecord>> recordCapture = ArgumentCaptor.forClass(Collection.class);
verify(sinkTask, times(2)).put(recordCapture.capture());
assertEquals(1, recordCapture.getValue().size());
SinkRecord record = recordCapture.getValue().iterator().next();
assertEquals(TOPIC, record.originalTopic());
assertEquals("newtopic_" + TOPIC, record.topic());
}
@Test
public void testPartitionCountInCaseOfPartitionRevocation() {
MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
// Setting up Worker Sink Task to check metrics
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
transformationChain, mockConsumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList);
mockConsumer.updateBeginningOffsets(
new HashMap<TopicPartition, Long>() {{
put(TOPIC_PARTITION, 0L);
put(TOPIC_PARTITION2, 0L);
}}
);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initial Re-balance to assign INITIAL_ASSIGNMENT which is "TOPIC_PARTITION" and "TOPIC_PARTITION2"
mockConsumer.rebalance(INITIAL_ASSIGNMENT);
assertSinkMetricValue("partition-count", 2);
// Revoked "TOPIC_PARTITION" and second re-balance with "TOPIC_PARTITION2"
mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2));
assertSinkMetricValue("partition-count", 1);
// Closing the Worker Sink Task which will update the partition count as 0.
workerTask.close();
assertSinkMetricValue("partition-count", 0);
}
private void expectRebalanceRevocationError(RuntimeException e) {
when(sinkTask.preCommit(anyMap())).thenReturn(Collections.emptyMap());
doThrow(e).when(sinkTask).close(INITIAL_ASSIGNMENT);
@ -599,6 +1007,10 @@ public class WorkerSinkTaskMockitoTest {
return expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, new RecordHeaders());
}
private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(final int numMessages, Headers headers) {
return expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers);
}
private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) {
return invocation -> {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();

View File

@ -35,7 +35,6 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
@ -50,13 +49,10 @@ import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -71,11 +67,9 @@ import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -88,15 +82,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -134,7 +125,6 @@ public class WorkerSinkTaskTest {
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
private TargetState initialState = TargetState.STARTED;
private MockTime time;
private WorkerSinkTask workerTask;
@ -162,7 +152,6 @@ public class WorkerSinkTaskTest {
@Mock
private ErrorHandlingMetrics errorHandlingMetrics;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
private Capture<Pattern> topicsRegex = EasyMock.newCapture();
private long recordsReturnedTp1;
private long recordsReturnedTp3;
@ -203,103 +192,6 @@ public class WorkerSinkTaskTest {
if (metrics != null) metrics.stop();
}
@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
expectConsumerPoll(1);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Pause
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
// Retry delivery should succeed
expectConsumerPoll(0);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall();
// And unpause
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
INITIAL_ASSIGNMENT.forEach(tp -> {
consumer.resume(singleton(tp));
PowerMock.expectLastCall();
});
// Expect commit
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
// Commit advance by one
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
// Nothing polled for this partition
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
EasyMock.expect(sinkTask.preCommit(workerCurrentOffsets)).andReturn(workerCurrentOffsets);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(workerCurrentOffsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(EasyMock.eq(Collections.emptyList()));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 0.0);
assertSinkMetricValue("sink-record-send-total", 0.0);
assertSinkMetricValue("sink-record-active-count", 0.0);
assertSinkMetricValue("sink-record-active-count-max", 0.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.0);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 0.0);
assertTaskMetricValue("batch-size-avg", 0.0);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
workerTask.iteration();
workerTask.iteration();
time.sleep(30000L);
assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0);
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.5);
assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
sinkTaskContext.getValue().requestCommit();
time.sleep(10000L);
workerTask.iteration();
assertSinkMetricValue("offset-commit-completion-total", 1.0);
PowerMock.verifyAll();
}
@Test
public void testPollRedeliveryWithConsumerRebalance() throws Exception {
createTask(initialState);
@ -1054,53 +946,6 @@ public class WorkerSinkTaskTest {
}
}
@Test
public void testTaskCancelPreventsFinalOffsetCommit() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(1);
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall();
// the second put will return after the task is stopped and cancelled (asynchronously)
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andAnswer(() -> {
workerTask.stop();
workerTask.cancel();
return null;
});
// stop wakes up the consumer
consumer.wakeup();
EasyMock.expectLastCall();
// task performs normal steps in advance of committing offsets
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.execute();
PowerMock.verifyAll();
}
// Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
// See KAFKA-5731 for more information.
@ -1318,294 +1163,6 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
@Test
public void testDeliveryWithMutatingTransform() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1, "newtopic_");
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
sinkTask.preCommit(offsets);
EasyMock.expectLastCall().andReturn(offsets);
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(() -> {
callback.getValue().onComplete(offsets, null);
return null;
});
expectConsumerPoll(0);
sinkTask.put(Collections.emptyList());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // initial assignment
workerTask.iteration(); // first record delivered
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
workerTask.iteration(); // triggers the commit
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
PowerMock.verifyAll();
}
@Test
public void testMissingTimestampPropagation() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
// we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
assertNull(record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
PowerMock.verifyAll();
}
@Test
public void testTimestampPropagation() throws Exception {
final Long timestamp = System.currentTimeMillis();
final TimestampType timestampType = TimestampType.CREATE_TIME;
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, timestamp, timestampType);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
assertEquals(timestamp, record.timestamp());
assertEquals(timestampType, record.timestampType());
PowerMock.verifyAll();
}
@Test
public void testTopicsRegex() {
Map<String, String> props = new HashMap<>(TASK_PROPS);
props.remove("topics");
props.put("topics.regex", "te.*");
TaskConfig taskConfig = new TaskConfig(props);
createTask(TargetState.PAUSED);
consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(props);
PowerMock.expectLastCall();
expectPollInitialAssignment();
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT);
consumer.pause(INITIAL_ASSIGNMENT);
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(taskConfig);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
PowerMock.verifyAll();
}
@Test
public void testHeaders() throws Exception {
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1, headers);
expectConversionAndTransformation(1, null, headers);
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
PowerMock.verifyAll();
}
@Test
public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
createTask(initialState, stringConverter, testConverter, stringConverter);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
String keyA = "a";
String valueA = "Árvíztűrő tükörfúrógép";
Headers headersA = new RecordHeaders();
String encodingA = "latin2";
headersA.add("encoding", encodingA.getBytes());
String keyB = "b";
String valueB = "Тестовое сообщение";
Headers headersB = new RecordHeaders();
String encodingB = "koi8_r";
headersB.add("encoding", encodingB.getBytes());
expectConsumerPoll(Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()),
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty())
));
expectTransformation(2, null);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
Iterator<SinkRecord> iterator = records.getValue().iterator();
SinkRecord recordA = iterator.next();
assertEquals(keyA, recordA.key());
assertEquals(valueA, recordA.value());
SinkRecord recordB = iterator.next();
assertEquals(keyB, recordB.key());
assertEquals(valueB, recordB.value());
PowerMock.verifyAll();
}
@Test
public void testOriginalTopicWithTopicMutatingTransformations() {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
expectPollInitialAssignment();
expectConsumerPoll(1);
expectConversionAndTransformation(1, "newtopic_");
Capture<Collection<SinkRecord>> recordCapture = EasyMock.newCapture();
sinkTask.put(EasyMock.capture(recordCapture));
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // initial assignment
workerTask.iteration(); // first record delivered
assertTrue(recordCapture.hasCaptured());
assertEquals(1, recordCapture.getValue().size());
SinkRecord record = recordCapture.getValue().iterator().next();
assertEquals(TOPIC, record.originalTopic());
assertEquals("newtopic_" + TOPIC, record.topic());
PowerMock.verifyAll();
}
@Test
public void testPartitionCountInCaseOfPartitionRevocation() {
MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
// Setting up Worker Sink Task to check metrics
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
transformationChain, mockConsumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList);
mockConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {{
put(TOPIC_PARTITION, 0 * 1L);
put(TOPIC_PARTITION2, 0 * 1L);
}});
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initial Re-balance to assign INITIAL_ASSIGNMENT which is "TOPIC_PARTITION" and "TOPIC_PARTITION2"
mockConsumer.rebalance(INITIAL_ASSIGNMENT);
assertSinkMetricValue("partition-count", 2);
// Revoked "TOPIC_PARTITION" and second re-balance with "TOPIC_PARTITION2"
mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2));
assertSinkMetricValue("partition-count", 1);
// Closing the Worker Sink Task which will update the partition count as 0.
workerTask.close();
assertSinkMetricValue("partition-count", 0);
}
private void expectInitializeTask() {
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
@ -1636,14 +1193,6 @@ public class WorkerSinkTaskTest {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, Headers headers) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers);
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
expectConsumerPoll(numMessages, timestamp, timestampType, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> {
@ -1660,15 +1209,6 @@ public class WorkerSinkTaskTest {
});
}
private void expectConsumerPoll(List<ConsumerRecord<byte[], byte[]>> records) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
() -> new ConsumerRecords<>(
records.isEmpty() ?
Collections.emptyMap() :
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)
));
}
private void expectConversionAndTransformation(final int numMessages) {
expectConversionAndTransformation(numMessages, null);
}