KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest (#13191)

Reviewers: Christo Lolov <christololov@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Hector Geraldino 2023-02-27 09:25:21 -05:00 committed by GitHub
parent 8d7d563231
commit 5f9d01668c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 282 additions and 308 deletions

View File

@ -409,7 +409,7 @@ subprojects {
"**/KafkaConfigBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
"**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*"
"**/WorkerSourceTaskTest.*"
])
}

View File

@ -16,20 +16,21 @@
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Arrays;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -53,30 +54,24 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
@ -95,12 +90,19 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@PowerMockIgnore({"javax.management.*",
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class AbstractWorkerSourceTaskTest {
private static final String TOPIC = "topic";
@ -118,7 +120,8 @@ public class AbstractWorkerSourceTaskTest {
private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
@Mock private SourceTask sourceTask;
@Mock
private SourceTask sourceTask;
@Mock private TopicAdmin admin;
@Mock private KafkaProducer<byte[], byte[]> producer;
@Mock private Converter keyConverter;
@ -130,7 +133,7 @@ public class AbstractWorkerSourceTaskTest {
@Mock private ConnectorOffsetBackingStore offsetStore;
@Mock private StatusBackingStore statusBackingStore;
@Mock private WorkerSourceTaskContext sourceTaskContext;
@MockStrict private TaskStatus.Listener statusListener;
@Mock private TaskStatus.Listener statusListener;
private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
@ -140,7 +143,6 @@ public class AbstractWorkerSourceTaskTest {
private SourceConnectorConfig sourceConfig;
private MockConnectMetrics metrics = new MockConnectMetrics();
@Mock private ErrorHandlingMetrics errorHandlingMetrics;
private Capture<Callback> producerCallbacks;
private AbstractWorkerSourceTask workerTask;
@ -149,8 +151,7 @@ public class AbstractWorkerSourceTaskTest {
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
producerCallbacks = EasyMock.newCapture();
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(), true);
metrics = new MockConnectMetrics();
}
@ -163,27 +164,28 @@ public class AbstractWorkerSourceTaskTest {
return props;
}
private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
private Map<String, String> sourceConnectorPropsWithGroups() {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put("name", "foo-connector");
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(1));
props.put(TOPIC_CONFIG, topic);
props.put(TOPIC_CONFIG, TOPIC);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, TOPIC);
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic);
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, TOPIC);
return props;
}
@After
public void tearDown() {
if (metrics != null) metrics.stop();
verifyNoMoreInteractions(statusListener);
}
@Test
@ -236,45 +238,44 @@ public class AbstractWorkerSourceTaskTest {
public void testSendRecordsConvertsData() {
createWorkerTask();
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
List<SourceRecord> records = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
);
expectSendRecord(emptyHeaders());
expectTopicCreation(TOPIC);
PowerMock.replayAll();
workerTask.toSend = records;
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
PowerMock.verifyAll();
verifyTaskGetTopic();
verifyTopicCreation();
}
@Test
public void testSendRecordsPropagatesTimestamp() {
final Long timestamp = System.currentTimeMillis();
createWorkerTask();
List<SourceRecord> records = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
);
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
expectSendRecord(emptyHeaders());
expectTopicCreation(TOPIC);
PowerMock.replayAll();
workerTask.toSend = records;
workerTask.toSend = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
);
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
assertEquals(timestamp, sent.getValue().timestamp());
PowerMock.verifyAll();
verifyTaskGetTopic();
verifyTopicCreation();
}
@Test
@ -282,19 +283,14 @@ public class AbstractWorkerSourceTaskTest {
final Long timestamp = -3L;
createWorkerTask();
List<SourceRecord> records = Collections.singletonList(
expectSendRecord(emptyHeaders());
workerTask.toSend = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
);
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
PowerMock.replayAll();
workerTask.toSend = records;
assertThrows(InvalidRecordException.class, workerTask::sendRecords);
assertFalse(sent.hasCaptured());
PowerMock.verifyAll();
verifyNoInteractions(producer);
verifyNoInteractions(admin);
}
@Test
@ -302,49 +298,48 @@ public class AbstractWorkerSourceTaskTest {
final Long timestamp = -1L;
createWorkerTask();
List<SourceRecord> records = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
);
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
expectSendRecord(emptyHeaders());
expectTopicCreation(TOPIC);
PowerMock.replayAll();
workerTask.toSend = records;
workerTask.toSend = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
);
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
assertNull(sent.getValue().timestamp());
PowerMock.verifyAll();
verifyTaskGetTopic();
verifyTopicCreation();
}
@Test
public void testHeaders() {
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
Headers headers = new RecordHeaders()
.add("header_key", "header_value".getBytes());
org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders()
.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
createWorkerTask();
List<SourceRecord> records = new ArrayList<>();
records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
expectSendRecord(headers);
expectTopicCreation(TOPIC);
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, headers);
PowerMock.replayAll();
workerTask.toSend = records;
workerTask.toSend = Collections.singletonList(
new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD,
null, connectHeaders)
);
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
assertEquals(headers, sent.getValue().headers());
PowerMock.verifyAll();
verifyTaskGetTopic();
verifyTopicCreation();
}
@Test
@ -354,47 +349,51 @@ public class AbstractWorkerSourceTaskTest {
createWorkerTask(stringConverter, testConverter, stringConverter);
List<SourceRecord> records = new ArrayList<>();
String stringA = "Árvíztűrő tükörfúrógép";
org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
String encodingA = "latin2";
headersA.addString("encoding", encodingA);
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
String stringB = "Тестовое сообщение";
org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
String encodingB = "koi8_r";
headersB.addString("encoding", encodingB);
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
expectSendRecord(null);
expectTopicCreation(TOPIC);
Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, null);
Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, null);
String stringA = "Árvíztűrő tükörfúrógép";
String encodingA = "latin2";
String stringB = "Тестовое сообщение";
String encodingB = "koi8_r";
PowerMock.replayAll();
org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders()
.addString("encoding", encodingA);
org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders()
.addString("encoding", encodingB);
workerTask.toSend = records;
workerTask.toSend = Arrays.asList(
new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "a",
Schema.STRING_SCHEMA, stringA, null, headersA),
new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "b",
Schema.STRING_SCHEMA, stringB, null, headersB)
);
workerTask.sendRecords();
assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
assertEquals(
ByteBuffer.wrap(stringA.getBytes(encodingA)),
ByteBuffer.wrap(sentRecordA.getValue().value())
);
assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
assertEquals(
ByteBuffer.wrap(stringB.getBytes(encodingB)),
ByteBuffer.wrap(sentRecordB.getValue().value())
);
assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
assertEquals(2, capturedValues.size());
PowerMock.verifyAll();
ProducerRecord<byte[], byte[]> sentRecordA = capturedValues.get(0);
ProducerRecord<byte[], byte[]> sentRecordB = capturedValues.get(1);
assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.key()));
assertEquals(
ByteBuffer.wrap(stringA.getBytes(encodingA)),
ByteBuffer.wrap(sentRecordA.value())
);
assertEquals(encodingA, new String(sentRecordA.headers().lastHeader("encoding").value()));
assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.key()));
assertEquals(
ByteBuffer.wrap(stringB.getBytes(encodingB)),
ByteBuffer.wrap(sentRecordB.value())
);
assertEquals(encodingB, new String(sentRecordB.headers().lastHeader("encoding").value()));
verifyTaskGetTopic(2);
verifyTopicCreation();
}
@Test
@ -404,18 +403,21 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
expectPreliminaryCalls(TOPIC);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));
expectSendRecord();
expectSendRecord();
PowerMock.replayAll();
expectSendRecord(emptyHeaders());
workerTask.toSend = Arrays.asList(record1, record2);
workerTask.sendRecords();
verifySendRecord(2);
verify(admin, never()).createOrFindTopics(any(NewTopic.class));
// Make sure we didn't try to create the topic after finding out it already existed
verifyNoMoreInteractions(admin);
}
@Test
@ -425,26 +427,24 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
// First round - call to describe the topic times out
EasyMock.expect(admin.describeTopics(TOPIC))
.andThrow(new RetriableException(new TimeoutException("timeout")));
expectPreliminaryCalls(TOPIC);
// Second round - calls to describe and create succeed
expectTopicCreation(TOPIC);
// Exactly two records are sent
expectSendRecord();
expectSendRecord();
PowerMock.replayAll();
when(admin.describeTopics(TOPIC))
.thenThrow(new RetriableException(new TimeoutException("timeout")))
.thenReturn(Collections.emptyMap());
workerTask.toSend = Arrays.asList(record1, record2);
workerTask.sendRecords();
assertEquals(Arrays.asList(record1, record2), workerTask.toSend);
verify(admin, never()).createOrFindTopics(any(NewTopic.class));
verifyNoMoreInteractions(admin);
// Next they all succeed
// Second round - calls to describe and create succeed
expectTopicCreation(TOPIC);
workerTask.sendRecords();
assertNull(workerTask.toSend);
verifyTopicCreation();
}
@Test
@ -454,19 +454,14 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First call to describe the topic times out
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout")));
expectPreliminaryCalls(TOPIC);
// Second round
expectTopicCreation(TOPIC);
expectSendRecord();
expectSendRecord();
PowerMock.replayAll();
when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class)))
// First call to create the topic times out
.thenThrow(new RetriableException(new TimeoutException("timeout")))
// Next attempt succeeds
.thenReturn(createdTopic(TOPIC));
workerTask.toSend = Arrays.asList(record1, record2);
workerTask.sendRecords();
@ -475,6 +470,9 @@ public class AbstractWorkerSourceTaskTest {
// Next they all succeed
workerTask.sendRecords();
assertNull(workerTask.toSend);
// First attempt failed, second succeeded
verifyTopicCreation(2, TOPIC, TOPIC);
}
@Test
@ -486,32 +484,37 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First round
expectPreliminaryCalls(TOPIC);
expectPreliminaryCalls(OTHER_TOPIC);
expectTopicCreation(TOPIC);
expectSendRecord();
expectSendRecord();
// First call to describe the topic times out
EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
.andThrow(new RetriableException(new TimeoutException("timeout")));
when(admin.describeTopics(anyString()))
.thenReturn(Collections.emptyMap())
.thenThrow(new RetriableException(new TimeoutException("timeout")))
.thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class))).thenAnswer(
(Answer<TopicAdmin.TopicCreationResponse>) invocation -> {
NewTopic newTopic = invocation.getArgument(0);
return createdTopic(newTopic.name());
});
// Second round
expectTopicCreation(OTHER_TOPIC);
expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
PowerMock.replayAll();
// Try to send 3, make first pass, second fail. Should save last two
// Try to send 3, make first pass, second fail. Should save last record
workerTask.toSend = Arrays.asList(record1, record2, record3);
workerTask.sendRecords();
assertEquals(Arrays.asList(record3), workerTask.toSend);
assertEquals(Collections.singletonList(record3), workerTask.toSend);
// Next they all succeed
workerTask.sendRecords();
assertNull(workerTask.toSend);
PowerMock.verifyAll();
verify(admin, times(3)).describeTopics(anyString());
ArgumentCaptor<NewTopic> newTopicCaptor = ArgumentCaptor.forClass(NewTopic.class);
verify(admin, times(2)).createOrFindTopics(newTopicCaptor.capture());
assertEquals(Arrays.asList(TOPIC, OTHER_TOPIC), newTopicCaptor.getAllValues()
.stream()
.map(NewTopic::name)
.collect(Collectors.toList()));
}
@Test
@ -523,34 +526,26 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First round
expectPreliminaryCalls(TOPIC);
expectPreliminaryCalls(OTHER_TOPIC);
expectTopicCreation(TOPIC);
expectSendRecord();
expectSendRecord();
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
// First call to create the topic times out
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout")));
when(admin.describeTopics(anyString())).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class)))
.thenReturn(createdTopic(TOPIC))
.thenThrow(new RetriableException(new TimeoutException("timeout")))
.thenReturn(createdTopic(OTHER_TOPIC));
// Second round
expectTopicCreation(OTHER_TOPIC);
expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
PowerMock.replayAll();
// Try to send 3, make first pass, second fail. Should save last two
// Try to send 3, make first pass, second fail. Should save last record
workerTask.toSend = Arrays.asList(record1, record2, record3);
workerTask.sendRecords();
assertEquals(Arrays.asList(record3), workerTask.toSend);
assertEquals(Collections.singletonList(record3), workerTask.toSend);
verifyTopicCreation(2, TOPIC, OTHER_TOPIC); // Second call to createOrFindTopics will throw
// Next they all succeed
workerTask.sendRecords();
assertNull(workerTask.toSend);
PowerMock.verifyAll();
verifyTopicCreation(3, TOPIC, OTHER_TOPIC, OTHER_TOPIC);
}
@Test
@ -560,11 +555,10 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC))
.andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
PowerMock.replayAll();
expectPreliminaryCalls(TOPIC);
when(admin.describeTopics(TOPIC)).thenThrow(
new ConnectException(new TopicAuthorizationException("unauthorized"))
);
workerTask.toSend = Arrays.asList(record1, record2);
assertThrows(ConnectException.class, workerTask::sendRecords);
@ -577,18 +571,17 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
PowerMock.replayAll();
expectPreliminaryCalls(TOPIC);
when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class))).thenThrow(
new ConnectException(new TopicAuthorizationException("unauthorized"))
);
workerTask.toSend = Arrays.asList(record1, record2);
assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(newTopicCapture.hasCaptured());
verify(admin).createOrFindTopics(any());
verifyTopicCreation();
}
@Test
@ -598,17 +591,16 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
expectPreliminaryCalls(TOPIC);
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
PowerMock.replayAll();
when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(TopicAdmin.EMPTY_CREATION);
workerTask.toSend = Arrays.asList(record1, record2);
assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(newTopicCapture.hasCaptured());
verify(admin).createOrFindTopics(any());
verifyTopicCreation();
}
@Test
@ -618,19 +610,21 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
expectSendRecord(emptyHeaders());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
expectSendRecord();
expectSendRecord();
PowerMock.replayAll();
when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(foundTopic(TOPIC));
workerTask.toSend = Arrays.asList(record1, record2);
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
assertEquals(2, capturedValues.size());
verifyTaskGetTopic(2);
verifyTopicCreation();
}
@Test
@ -640,144 +634,126 @@ public class AbstractWorkerSourceTaskTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
expectSendRecord(emptyHeaders());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
expectSendRecord();
expectSendRecord();
PowerMock.replayAll();
when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
workerTask.toSend = Arrays.asList(record1, record2);
workerTask.sendRecords();
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
assertEquals(2, capturedValues.size());
verifyTaskGetTopic(2);
verifyTopicCreation();
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
String topic,
boolean anyTimes,
Headers headers
) {
private void expectSendRecord(Headers headers) {
if (headers != null)
expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
expectConvertHeadersAndKeyValue(headers, TOPIC);
expectApplyTransformationChain(anyTimes);
expectApplyTransformationChain();
Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
expectTaskGetTopic();
}
IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
return verifySendRecord(1);
}
IAnswer<Future<RecordMetadata>> expectResponse = () -> {
synchronized (producerCallbacks) {
for (Callback cb : producerCallbacks.getValues()) {
cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
}
producerCallbacks.reset();
}
return null;
};
private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) {
ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class);
ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class);
verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture());
if (anyTimes)
expect.andStubAnswer(expectResponse);
else
expect.andAnswer(expectResponse);
expectTaskGetTopic(anyTimes);
for (Callback cb : producerCallbacks.getAllValues()) {
cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0),
null);
}
return sent;
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
return expectSendRecord(TOPIC, true, emptyHeaders());
private void expectTaskGetTopic() {
when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
String connector = invocation.getArgument(0, String.class);
String topic = invocation.getArgument(1, String.class);
return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
});
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
return expectSendRecord(TOPIC, false, emptyHeaders());
}
private void expectTaskGetTopic(boolean anyTimes) {
final Capture<String> connectorCapture = EasyMock.newCapture();
final Capture<String> topicCapture = EasyMock.newCapture();
IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
EasyMock.capture(connectorCapture),
EasyMock.capture(topicCapture)));
if (anyTimes) {
expect.andStubAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
} else {
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
}
if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
assertEquals("job", connectorCapture.getValue());
assertEquals(TOPIC, topicCapture.getValue());
}
private void verifyTaskGetTopic() {
verifyTaskGetTopic(1);
}
private void verifyTaskGetTopic(int times) {
ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class);
verify(statusBackingStore, times(times)).getTopic(connectorCapture.capture(), topicCapture.capture());
assertEquals("job", connectorCapture.getValue());
assertEquals(TOPIC, topicCapture.getValue());
}
@SuppressWarnings("SameParameterValue")
private void expectTopicCreation(String topic) {
if (config.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
}
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic));
}
private void verifyTopicCreation() {
verifyTopicCreation(1, TOPIC);
}
private void verifyTopicCreation(int times, String... topics) {
ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
verify(admin, times(times)).createOrFindTopics(newTopicCapture.capture());
assertArrayEquals(topics, newTopicCapture.getAllValues()
.stream()
.map(NewTopic::name)
.toArray(String[]::new));
}
@SuppressWarnings("SameParameterValue")
private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
return new TopicAdmin.TopicCreationResponse(created, existing);
}
@SuppressWarnings("SameParameterValue")
private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
Set<String> created = Collections.emptySet();
Set<String> existing = Collections.singleton(topic);
return new TopicAdmin.TopicCreationResponse(created, existing);
}
private void expectPreliminaryCalls() {
expectPreliminaryCalls(TOPIC);
}
private void expectPreliminaryCalls(String topic) {
expectConvertHeadersAndKeyValue(topic, true, emptyHeaders());
expectApplyTransformationChain(false);
PowerMock.expectLastCall();
expectConvertHeadersAndKeyValue(emptyHeaders(), topic);
expectApplyTransformationChain();
}
private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) {
for (Header header : headers) {
IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
if (anyTimes)
convertHeaderExpect.andStubReturn(header.value());
else
convertHeaderExpect.andReturn(header.value());
private void expectConvertHeadersAndKeyValue(Headers headers, String topic) {
if (headers.iterator().hasNext()) {
when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA),
anyString()))
.thenAnswer((Answer<byte[]>) invocation -> {
String headerValue = invocation.getArgument(3, String.class);
return headerValue.getBytes(StandardCharsets.UTF_8);
});
}
IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
if (anyTimes)
convertKeyExpect.andStubReturn(SERIALIZED_KEY);
else
convertKeyExpect.andReturn(SERIALIZED_KEY);
IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD));
if (anyTimes)
convertValueExpect.andStubReturn(SERIALIZED_RECORD);
else
convertValueExpect.andReturn(SERIALIZED_RECORD);
when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY)))
.thenReturn(SERIALIZED_KEY);
when(valueConverter.fromConnectData(eq(topic), any(Headers.class), eq(RECORD_SCHEMA),
eq(RECORD)))
.thenReturn(SERIALIZED_RECORD);
}
private void expectApplyTransformationChain(boolean anyTimes) {
final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
if (anyTimes)
convertKeyExpect.andStubAnswer(recordCapture::getValue);
else
convertKeyExpect.andAnswer(recordCapture::getValue);
private void expectApplyTransformationChain() {
when(transformationChain.apply(any(SourceRecord.class)))
.thenAnswer(AdditionalAnswers.returnsFirstArg());
}
private RecordHeaders emptyHeaders() {
@ -839,7 +815,5 @@ public class AbstractWorkerSourceTaskTest {
protected void finalOffsetCommit(boolean failed) {
}
};
}
}