KAFKA-10070: parameterize Connect unit tests to remove code duplication (#10299)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit is contained in:
Lev Zemlyanov 2021-03-19 07:03:36 -07:00 committed by GitHub
parent 367eca083b
commit 1fb8bd9c44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 572 additions and 3686 deletions

View File

@ -137,7 +137,7 @@
<!-- connect tests--> <!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling" <suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation)Test.java"/> files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation)Test.java"/> files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation)Test.java"/>

View File

@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@ -56,7 +57,9 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.IExpectationSetters; import org.easymock.IExpectationSetters;
@ -69,14 +72,17 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
@ -87,10 +93,16 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class}) @PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
@PowerMockIgnore("javax.management.*") @PowerMockIgnore("javax.management.*")
public class ErrorHandlingTaskTest { public class ErrorHandlingTaskTest {
@ -156,10 +168,21 @@ public class ErrorHandlingTaskTest {
@SuppressWarnings("unused") @SuppressWarnings("unused")
@Mock private StatusBackingStore statusBackingStore; @Mock private StatusBackingStore statusBackingStore;
@Mock
private WorkerErrantRecordReporter workerErrantRecordReporter;
private ErrorHandlingMetrics errorHandlingMetrics; private ErrorHandlingMetrics errorHandlingMetrics;
// when this test becomes parameterized, this variable will be a test parameter private boolean enableTopicCreation;
public boolean enableTopicCreation = false;
@ParameterizedTest.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public ErrorHandlingTaskTest(boolean enableTopicCreation) {
this.enableTopicCreation = enableTopicCreation;
}
@Before @Before
public void setup() { public void setup() {
@ -189,6 +212,10 @@ public class ErrorHandlingTaskTest {
props.put(TOPIC_CONFIG, topic); props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
return props; return props;
} }
@ -514,9 +541,16 @@ public class ErrorHandlingTaskTest {
private void expectTopicCreation(String topic) { private void expectTopicCreation(String topic) {
if (workerConfig.topicCreationEnable()) { if (workerConfig.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
if (enableTopicCreation) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
} else {
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
}
} }
} }
@ -533,7 +567,7 @@ public class ErrorHandlingTaskTest {
taskId, sinkTask, statusListener, initialState, workerConfig, taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter, ClusterConfigState.EMPTY, metrics, converter, converter,
headerConverter, sinkTransforms, consumer, pluginLoader, time, headerConverter, sinkTransforms, consumer, pluginLoader, time,
retryWithToleranceOperator, null, statusBackingStore); retryWithToleranceOperator, workerErrantRecordReporter, statusBackingStore);
} }
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
@ -559,12 +593,12 @@ public class ErrorHandlingTaskTest {
TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator); TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator);
workerSourceTask = PowerMock.createPartialMock( workerSourceTask = PowerMock.createPartialMock(
WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"}, WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms, taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms,
producer, admin, null, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, workerConfig, offsetReader, offsetWriter, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run); statusBackingStore, (Executor) Runnable::run);
} }
private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) { private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {

View File

@ -1,658 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Time.SYSTEM;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
@PowerMockIgnore("javax.management.*")
public class ErrorHandlingTaskWithTopicCreationTest {
private static final String TOPIC = "test";
private static final int PARTITION1 = 12;
private static final int PARTITION2 = 13;
private static final long FIRST_OFFSET = 45;
@Mock Plugins plugins;
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000;
public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000;
public static final ToleranceType OPERATOR_TOLERANCE_TYPE = ToleranceType.ALL;
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private TargetState initialState = TargetState.STARTED;
private Time time;
private MockConnectMetrics metrics;
@SuppressWarnings("unused")
@Mock
private SinkTask sinkTask;
@SuppressWarnings("unused")
@Mock
private SourceTask sourceTask;
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
private SourceConnectorConfig sourceConfig;
@Mock
private PluginClassLoader pluginLoader;
@SuppressWarnings("unused")
@Mock
private HeaderConverter headerConverter;
private WorkerSinkTask workerSinkTask;
private WorkerSourceTask workerSourceTask;
@SuppressWarnings("unused")
@Mock
private KafkaConsumer<byte[], byte[]> consumer;
@SuppressWarnings("unused")
@Mock
private KafkaProducer<byte[], byte[]> producer;
@SuppressWarnings("unused")
@Mock private TopicAdmin admin;
@Mock
OffsetStorageReaderImpl offsetReader;
@Mock
OffsetStorageWriter offsetWriter;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@SuppressWarnings("unused")
@Mock
private TaskStatus.Listener statusListener;
@SuppressWarnings("unused")
@Mock private StatusBackingStore statusBackingStore;
@Mock
private WorkerErrantRecordReporter workerErrantRecordReporter;
private ErrorHandlingMetrics errorHandlingMetrics;
// when this test becomes parameterized, this variable will be a test parameter
public boolean enableTopicCreation = true;
@Before
public void setup() {
time = new MockTime(0, 0, 0);
metrics = new MockConnectMetrics();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
workerConfig = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
}
private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put("name", "foo-connector");
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(1));
props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
return props;
}
@After
public void tearDown() {
if (metrics != null) {
metrics.stop();
}
}
@Test
public void testSinkTasksCloseErrorReporters() throws Exception {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
expectInitializeTask();
reporter.close();
EasyMock.expectLastCall();
sinkTask.stop();
EasyMock.expectLastCall();
consumer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
PowerMock.verifyAll();
}
@Test
public void testSourceTasksCloseErrorReporters() {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
expectClose();
reporter.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
PowerMock.verifyAll();
}
@Test
public void testCloseErrorReportersExceptionPropagation() {
ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
createSourceTask(initialState, retryWithToleranceOperator);
expectClose();
// Even though the reporters throw exceptions, they should both still be closed.
reporterA.close();
EasyMock.expectLastCall().andThrow(new RuntimeException());
reporterB.close();
EasyMock.expectLastCall().andThrow(new RuntimeException());
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
PowerMock.verifyAll();
}
@Test
public void testErrorHandlingInSinkTasks() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
expectInitializeTask();
expectTaskGetTopic(true);
// valid json
ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
// bad json
ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
PowerMock.replayAll();
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.iteration();
workerSinkTask.iteration();
// two records were consumed from Kafka
assertSinkMetricValue("sink-record-read-total", 2.0);
// only one was written to the task
assertSinkMetricValue("sink-record-send-total", 1.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 1.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 3.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 1.0);
PowerMock.verifyAll();
}
private RetryWithToleranceOperator operator() {
return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM);
}
@Test
public void testErrorHandlingInSourceTasks() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
// valid json
Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
Struct struct1 = new Struct(valSchema).put("val", 1234);
SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
sourceTask.initialize(EasyMock.anyObject());
EasyMock.expectLastCall();
sourceTask.start(EasyMock.anyObject());
EasyMock.expectLastCall();
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
expectTopicDoesNotExist(TOPIC);
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.execute();
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
assertSourceMetricValue("source-record-write-total", 0.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 0.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 4.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
PowerMock.verifyAll();
}
private ConnectorConfig connConfig(Map<String, String> connProps) {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, "test");
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName());
props.putAll(connProps);
return new ConnectorConfig(plugins, props);
}
@Test
public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator, badConverter());
// valid json
Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
Struct struct1 = new Struct(valSchema).put("val", 1234);
SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
sourceTask.initialize(EasyMock.anyObject());
EasyMock.expectLastCall();
sourceTask.start(EasyMock.anyObject());
EasyMock.expectLastCall();
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
expectTopicDoesNotExist(TOPIC);
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.execute();
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
assertSourceMetricValue("source-record-write-total", 0.0);
// one record completely failed (converter issues)
assertErrorHandlingMetricValue("total-record-errors", 0.0);
// 2 failures in the transformation, and 1 in the converter
assertErrorHandlingMetricValue("total-record-failures", 8.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
PowerMock.verifyAll();
}
private void assertSinkMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = workerSinkTask.sinkTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertSourceMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = workerSourceTask.sourceTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertErrorHandlingMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = errorHandlingMetrics.metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void expectInitializeTask() throws Exception {
consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(TASK_PROPS);
PowerMock.expectLastCall();
}
private void expectTaskGetTopic(boolean anyTimes) {
final Capture<String> connectorCapture = EasyMock.newCapture();
final Capture<String> topicCapture = EasyMock.newCapture();
IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
EasyMock.capture(connectorCapture),
EasyMock.capture(topicCapture)));
if (anyTimes) {
expect.andStubAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
} else {
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
}
if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
assertEquals("job", connectorCapture.getValue());
assertEquals(TOPIC, topicCapture.getValue());
}
}
private void expectClose() {
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
admin.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
}
private void expectTopicDoesNotExist(String topic) {
if (workerConfig.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
}
}
private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
JsonConverter converter = new JsonConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter, statusBackingStore);
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
JsonConverter converter = new JsonConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
createSourceTask(initialState, retryWithToleranceOperator, converter);
}
private Converter badConverter() {
FaultyConverter converter = new FaultyConverter();
Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
oo.put("converter.type", "value");
oo.put("schemas.enable", "false");
converter.configure(oo);
return converter;
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator);
workerSourceTask = PowerMock.createPartialMock(
WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms,
producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run);
}
private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecords<>(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()), singletonList(record)));
}
private abstract static class TestSinkTask extends SinkTask {
}
static class FaultyConverter extends JsonConverter {
private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
private int invocations = 0;
public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (value == null) {
return super.fromConnectData(topic, schema, null);
}
invocations++;
if (invocations % 3 == 0) {
log.debug("Succeeding record: {} where invocations={}", value, invocations);
return super.fromConnectData(topic, schema, value);
} else {
log.debug("Failing record: {} at invocations={}", value, invocations);
throw new RetriableException("Bad invocations " + invocations + " for mod 3");
}
}
}
static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
private static final String MOD_CONFIG = "mod";
private static final int MOD_CONFIG_DEFAULT = 3;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(MOD_CONFIG, ConfigDef.Type.INT, MOD_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, "Pass records without failure only if timestamp % mod == 0");
private int mod = MOD_CONFIG_DEFAULT;
private int invocations = 0;
@Override
public R apply(R record) {
invocations++;
if (invocations % mod == 0) {
log.debug("Succeeding record: {} where invocations={}", record, invocations);
return record;
} else {
log.debug("Failing record: {} at invocations={}", record, invocations);
throw new RetriableException("Bad invocations " + invocations + " for mod " + mod);
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
log.info("Shutting down transform");
}
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
mod = Math.max(config.getInt(MOD_CONFIG), 2);
log.info("Configuring {}. Setting mod to {}", this.getClass(), mod);
}
}
}

View File

@ -16,13 +16,18 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import java.util.Collection;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
@ -31,6 +36,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector; import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@ -50,8 +56,10 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest; import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.IAnswer; import org.easymock.IAnswer;
@ -64,6 +72,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict; import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.powermock.reflect.Whitebox; import org.powermock.reflect.Whitebox;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -74,6 +83,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -87,6 +97,12 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -97,8 +113,10 @@ import static org.junit.Assert.assertTrue;
@PowerMockIgnore({"javax.management.*", @PowerMockIgnore({"javax.management.*",
"org.apache.log4j.*"}) "org.apache.log4j.*"})
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
public class WorkerSourceTaskTest extends ThreadedTest { public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static final String OTHER_TOPIC = "other-topic";
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
@ -146,29 +164,42 @@ public class WorkerSourceTaskTest extends ThreadedTest {
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
); );
// when this test becomes parameterized, this variable will be a test parameter private boolean enableTopicCreation;
public boolean enableTopicCreation = false;
@ParameterizedTest.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public WorkerSourceTaskTest(boolean enableTopicCreation) {
this.enableTopicCreation = enableTopicCreation;
}
@Override @Override
public void setup() { public void setup() {
super.setup(); super.setup();
Map<String, String> workerProps = new HashMap<>(); Map<String, String> workerProps = workerProps();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
plugins = new Plugins(workerProps); plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps); config = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorProps(TOPIC), true); sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
producerCallbacks = EasyMock.newCapture(); producerCallbacks = EasyMock.newCapture();
metrics = new MockConnectMetrics(); metrics = new MockConnectMetrics();
} }
private Map<String, String> sourceConnectorProps(String topic) { private Map<String, String> workerProps() {
Map<String, String> props = new HashMap<>();
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put("offset.storage.file.filename", "/tmp/connect.offsets");
props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
return props;
}
private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
// setup up props for the source connector // setup up props for the source connector
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
props.put("name", "foo-connector"); props.put("name", "foo-connector");
@ -177,6 +208,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
props.put(TOPIC_CONFIG, topic); props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic);
return props; return props;
} }
@ -195,9 +232,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, null, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run); RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run);
} }
@Test @Test
@ -631,6 +668,29 @@ public class WorkerSourceTaskTest extends ThreadedTest {
assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
} }
@Test
public void testSendRecordsProducerSendFailsImmediately() {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
expectTopicCreation(TOPIC);
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject()))
.andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC)));
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
}
@Test @Test
public void testSendRecordsTaskCommitRecordFail() throws Exception { public void testSendRecordsTaskCommitRecordFail() throws Exception {
createWorkerTask(); createWorkerTask();
@ -777,11 +837,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
createWorkerTask(); createWorkerTask();
List<SourceRecord> records = new ArrayList<>(); List<SourceRecord> records = new ArrayList<>();
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders)); records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
expectTopicCreation(TOPIC); expectTopicCreation(TOPIC);
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(true, false, true, true, true, headers); Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers);
PowerMock.replayAll(); PowerMock.replayAll();
@ -819,8 +879,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
expectTopicCreation(TOPIC); expectTopicCreation(TOPIC);
Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(false, false, true, true, false, null); Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null);
Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(false, false, true, true, false, null); Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null);
PowerMock.replayAll(); PowerMock.replayAll();
@ -844,6 +904,335 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test
public void testTopicCreateWhenTopicExists() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
@Test
public void testSendRecordsTopicDescribeRetries() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
// First round - call to describe the topic times out
EasyMock.expect(admin.describeTopics(TOPIC))
.andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round - calls to describe and create succeed
expectTopicCreation(TOPIC);
// Exactly two records are sent
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
// Next they all succeed
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertNull(Whitebox.getInternalState(workerTask, "toSend"));
}
@Test
public void testSendRecordsTopicCreateRetries() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First call to describe the topic times out
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round
expectTopicCreation(TOPIC);
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
// Next they all succeed
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertNull(Whitebox.getInternalState(workerTask, "toSend"));
}
@Test
public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
// Differentiate only by Kafka partition so we can reuse conversion expectations
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First round
expectPreliminaryCalls(OTHER_TOPIC);
expectTopicCreation(TOPIC);
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
// First call to describe the topic times out
EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
.andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round
expectTopicCreation(OTHER_TOPIC);
expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
PowerMock.replayAll();
// Try to send 3, make first pass, second fail. Should save last two
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
// Next they all succeed
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertNull(Whitebox.getInternalState(workerTask, "toSend"));
PowerMock.verifyAll();
}
@Test
public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
// Differentiate only by Kafka partition so we can reuse conversion expectations
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// First round
expectPreliminaryCalls(OTHER_TOPIC);
expectTopicCreation(TOPIC);
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
// First call to create the topic times out
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round
expectTopicCreation(OTHER_TOPIC);
expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
PowerMock.replayAll();
// Try to send 3, make first pass, second fail. Should save last two
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
// Next they all succeed
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertNull(Whitebox.getInternalState(workerTask, "toSend"));
PowerMock.verifyAll();
}
@Test
public void testTopicDescribeFails() {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC))
.andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
}
@Test
public void testTopicCreateFails() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
assertTrue(newTopicCapture.hasCaptured());
}
@Test
public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
assertTrue(newTopicCapture.hasCaptured());
}
@Test
public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
@Test
public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception {
if (!enableTopicCreation)
// should only test with topic creation enabled
return;
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
return new TopicAdmin.TopicCreationResponse(created, existing);
}
private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
Set<String> created = Collections.emptySet();
Set<String> existing = Collections.singleton(topic);
return new TopicAdmin.TopicCreationResponse(created, existing);
}
private void expectPreliminaryCalls() {
expectPreliminaryCalls(TOPIC);
}
private void expectPreliminaryCalls(String topic) {
expectConvertHeadersAndKeyValue(topic, true, emptyHeaders());
expectApplyTransformationChain(false);
offsetWriter.offset(PARTITION, OFFSET);
PowerMock.expectLastCall();
}
private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) throws InterruptedException { private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(minimum); final CountDownLatch latch = new CountDownLatch(minimum);
// Note that we stub these to allow any number of calls because the thread will continue to // Note that we stub these to allow any number of calls because the thread will continue to
@ -889,9 +1278,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.expectLastCall(); PowerMock.expectLastCall();
EasyMock.expect( EasyMock.expect(
producer.send(EasyMock.anyObject(ProducerRecord.class), producer.send(EasyMock.anyObject(ProducerRecord.class),
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
.andThrow(error); .andThrow(error);
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException {
@ -903,22 +1292,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException { private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
return expectSendRecord(false, false, false, false, true, emptyHeaders()); return expectSendRecord(TOPIC, false, false, false, false, true, emptyHeaders());
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException { private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true, true, true, emptyHeaders()); return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, true, emptyHeaders());
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException { private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true, false, true, emptyHeaders()); return expectSendRecord(TOPIC, anyTimes, isRetry, true, false, true, emptyHeaders());
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException { private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, succeed, true, true, emptyHeaders()); return expectSendRecord(TOPIC, anyTimes, isRetry, succeed, true, true, emptyHeaders());
} }
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
String topic,
boolean anyTimes, boolean anyTimes,
boolean isRetry, boolean isRetry,
boolean sendSuccess, boolean sendSuccess,
@ -927,7 +1317,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
Headers headers Headers headers
) throws InterruptedException { ) throws InterruptedException {
if (isMockedConverters) { if (isMockedConverters) {
expectConvertHeadersAndKeyValue(anyTimes, headers); expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
} }
expectApplyTransformationChain(anyTimes); expectApplyTransformationChain(anyTimes);
@ -976,23 +1366,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
} }
private void expectConvertHeadersAndKeyValue(boolean anyTimes) { private void expectConvertHeadersAndKeyValue(boolean anyTimes) {
expectConvertHeadersAndKeyValue(anyTimes, emptyHeaders()); expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders());
} }
private void expectConvertHeadersAndKeyValue(boolean anyTimes, Headers headers) { private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) {
for (Header header : headers) { for (Header header : headers) {
IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(TOPIC, header.key(), Schema.STRING_SCHEMA, new String(header.value()))); IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
if (anyTimes) if (anyTimes)
convertHeaderExpect.andStubReturn(header.value()); convertHeaderExpect.andStubReturn(header.value());
else else
convertHeaderExpect.andReturn(header.value()); convertHeaderExpect.andReturn(header.value());
} }
IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(TOPIC, headers, KEY_SCHEMA, KEY)); IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
if (anyTimes) if (anyTimes)
convertKeyExpect.andStubReturn(SERIALIZED_KEY); convertKeyExpect.andStubReturn(SERIALIZED_KEY);
else else
convertKeyExpect.andReturn(SERIALIZED_KEY); convertKeyExpect.andReturn(SERIALIZED_KEY);
IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(TOPIC, headers, RECORD_SCHEMA, RECORD)); IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD));
if (anyTimes) if (anyTimes)
convertValueExpect.andStubReturn(SERIALIZED_RECORD); convertValueExpect.andStubReturn(SERIALIZED_RECORD);
else else
@ -1128,9 +1518,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private void expectTopicCreation(String topic) { private void expectTopicCreation(String topic) {
if (config.topicCreationEnable()) { if (config.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true); EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
} }
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import java.util.Collection;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
@ -64,6 +65,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest; import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup; import org.apache.kafka.connect.util.TopicCreationGroup;
@ -97,7 +99,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyObject;
@ -113,6 +119,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
@PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class}) @PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class})
@PowerMockIgnore("javax.management.*") @PowerMockIgnore("javax.management.*")
public class WorkerTest extends ThreadedTest { public class WorkerTest extends ThreadedTest {
@ -161,8 +168,16 @@ public class WorkerTest extends ThreadedTest {
private String mockFileProviderTestId; private String mockFileProviderTestId;
private Map<String, String> connectorProps; private Map<String, String> connectorProps;
// when this test becomes parameterized, this variable will be a test parameter private boolean enableTopicCreation;
public boolean enableTopicCreation = false;
@ParameterizedTest.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public WorkerTest(boolean enableTopicCreation) {
this.enableTopicCreation = enableTopicCreation;
}
@Before @Before
public void setup() { public void setup() {
@ -1453,6 +1468,8 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
return props; return props;
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import java.lang.annotation.Annotation;
import org.junit.runner.Description;
import org.junit.runner.manipulation.Filter;
import org.junit.runner.manipulation.NoTestsRemainException;
import org.junit.runners.Parameterized;
/**
* Running a single parameterized test causes issue as explained in
* http://youtrack.jetbrains.com/issue/IDEA-65966 and
* https://stackoverflow.com/questions/12798079/initializationerror-with-eclipse-and-junit4-when-executing-a-single-test/18438718#18438718
*
* As a workaround, the original filter needs to be wrapped and then pass it a deparameterized
* description which removes the parameter part (See deparametrizeName)
*/
public class ParameterizedTest extends Parameterized {
public ParameterizedTest(Class<?> klass) throws Throwable {
super(klass);
}
@Override
public void filter(Filter filter) throws NoTestsRemainException {
super.filter(new FilterDecorator(filter));
}
private static String deparametrizeName(String name) {
//Each parameter is named as [0], [1] etc
if (name.startsWith("[")) {
return name;
}
//Convert methodName[index](className) to methodName(className)
int indexOfOpenBracket = name.indexOf('[');
int indexOfCloseBracket = name.indexOf(']') + 1;
return name.substring(0, indexOfOpenBracket).concat(name.substring(indexOfCloseBracket));
}
private static Description wrap(Description description) {
String fixedName = deparametrizeName(description.getDisplayName());
Description clonedDescription = Description.createSuiteDescription(
fixedName,
description.getAnnotations().toArray(new Annotation[0])
);
description.getChildren().forEach(child -> clonedDescription.addChild(wrap(child)));
return clonedDescription;
}
private static class FilterDecorator extends Filter {
private final Filter delegate;
private FilterDecorator(Filter delegate) {
this.delegate = delegate;
}
@Override
public boolean shouldRun(Description description) {
return delegate.shouldRun(wrap(description));
}
@Override
public String describe() {
return delegate.describe();
}
}
}