KAFKA-14132: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest (#12735)

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Shekhar Rajak 2022-12-21 22:11:03 +05:30 committed by GitHub
parent aad5b0a463
commit 2dcf306ef8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 190 deletions

View File

@ -412,7 +412,7 @@ subprojects {
// connect tests
"**/ConnectorPluginsResourceTest.*",
"**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*",
"**/KafkaConfigBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",

View File

@ -58,33 +58,29 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.Set;
import java.util.Collections;
import java.util.Collection;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
@ -103,18 +99,29 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
@PowerMockIgnore("javax.management.*")
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doReturn;
@RunWith(Parameterized.class)
public class ErrorHandlingTaskTest {
@Rule
public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
private static final String TOPIC = "test";
private static final int PARTITION1 = 12;
private static final int PARTITION2 = 13;
private static final long FIRST_OFFSET = 45;
@Mock Plugins plugins;
@Mock
Plugins plugins;
private static final Map<String, String> TASK_PROPS = new HashMap<>();
@ -139,7 +146,6 @@ public class ErrorHandlingTaskTest {
@SuppressWarnings("unused")
@Mock
private SourceTask sourceTask;
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
private SourceConnectorConfig sourceConfig;
@Mock
@ -164,8 +170,6 @@ public class ErrorHandlingTaskTest {
OffsetStorageWriter offsetWriter;
@Mock
private ConnectorOffsetBackingStore offsetStore;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@SuppressWarnings("unused")
@Mock
private TaskStatus.Listener statusListener;
@ -179,7 +183,7 @@ public class ErrorHandlingTaskTest {
private boolean enableTopicCreation;
@ParameterizedTest.Parameters
@Parameterized.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
@ -197,7 +201,6 @@ public class ErrorHandlingTaskTest {
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
workerConfig = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorProps(TOPIC), true);
errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
@ -228,81 +231,58 @@ public class ErrorHandlingTaskTest {
@Test
public void testSinkTasksCloseErrorReporters() throws Exception {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
expectInitializeTask();
reporter.close();
EasyMock.expectLastCall();
sinkTask.stop();
EasyMock.expectLastCall();
consumer.close();
EasyMock.expectLastCall();
headerConverter.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
PowerMock.verifyAll();
// verify if invocation happened exactly 1 time
verifyInitializeSink();
verify(reporter).close();
verify(sinkTask).stop();
verify(consumer).close();
verify(headerConverter).close();
}
@Test
public void testSourceTasksCloseErrorReporters() {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
public void testSourceTasksCloseErrorReporters() throws IOException {
ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
expectClose();
reporter.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
PowerMock.verifyAll();
verifyCloseSource();
verify(reporter).close();
}
@Test
public void testCloseErrorReportersExceptionPropagation() {
ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
public void testCloseErrorReportersExceptionPropagation() throws IOException {
ErrorReporter reporterA = mock(ErrorReporter.class);
ErrorReporter reporterB = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
createSourceTask(initialState, retryWithToleranceOperator);
expectClose();
// Even though the reporters throw exceptions, they should both still be closed.
reporterA.close();
EasyMock.expectLastCall().andThrow(new RuntimeException());
reporterB.close();
EasyMock.expectLastCall().andThrow(new RuntimeException());
PowerMock.replayAll();
doThrow(new RuntimeException()).when(reporterA).close();
doThrow(new RuntimeException()).when(reporterB).close();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
PowerMock.verifyAll();
verify(reporterA).close();
verify(reporterB).close();
verifyCloseSource();
}
@Test
@ -316,21 +296,19 @@ public class ErrorHandlingTaskTest {
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
expectInitializeTask();
expectTaskGetTopic(true);
// valid json
ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
TOPIC, PARTITION1, FIRST_OFFSET,
null, "{\"a\": 10}".getBytes());
// bad json
ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(
TOPIC, PARTITION2, FIRST_OFFSET,
null, "{\"a\" 10}".getBytes());
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
PowerMock.replayAll();
when(consumer.poll(any()))
.thenReturn(records(record1))
.thenReturn(records(record2));
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
@ -338,6 +316,9 @@ public class ErrorHandlingTaskTest {
workerSinkTask.iteration();
verifyInitializeSink();
verify(sinkTask, times(2)).put(any());
// two records were consumed from Kafka
assertSinkMetricValue("sink-record-read-total", 2.0);
// only one was written to the task
@ -348,12 +329,12 @@ public class ErrorHandlingTaskTest {
assertErrorHandlingMetricValue("total-record-failures", 3.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 1.0);
PowerMock.verifyAll();
}
private RetryWithToleranceOperator operator() {
return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM, errorHandlingMetrics);
return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS,
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE,
SYSTEM, errorHandlingMetrics);
}
@Test
@ -374,30 +355,29 @@ public class ErrorHandlingTaskTest {
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
when(workerSourceTask.isStopping())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
doReturn(true).when(workerSourceTask).commitOffsets();
offsetStore.start();
EasyMock.expectLastCall();
sourceTask.initialize(EasyMock.anyObject());
EasyMock.expectLastCall();
sourceTask.start(EasyMock.anyObject());
EasyMock.expectLastCall();
when(sourceTask.poll())
.thenReturn(singletonList(record1))
.thenReturn(singletonList(record2));
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
expectTopicCreation(TOPIC);
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
verify(workerSourceTask, times(3)).isStopping();
verify(workerSourceTask).commitOffsets();
verify(offsetStore).start();
verify(sourceTask).initialize(any());
verify(sourceTask).start(any());
verify(sourceTask, times(2)).poll();
verify(producer, times(2)).send(any(), any());
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
@ -408,8 +388,6 @@ public class ErrorHandlingTaskTest {
assertErrorHandlingMetricValue("total-record-failures", 4.0);
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
PowerMock.verifyAll();
}
private ConnectorConfig connConfig(Map<String, String> connProps) {
@ -438,30 +416,20 @@ public class ErrorHandlingTaskTest {
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
when(workerSourceTask.isStopping())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
doReturn(true).when(workerSourceTask).commitOffsets();
offsetStore.start();
EasyMock.expectLastCall();
sourceTask.initialize(EasyMock.anyObject());
EasyMock.expectLastCall();
sourceTask.start(EasyMock.anyObject());
EasyMock.expectLastCall();
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
when(sourceTask.poll())
.thenReturn(singletonList(record1))
.thenReturn(singletonList(record2));
expectTopicCreation(TOPIC);
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
@ -473,7 +441,13 @@ public class ErrorHandlingTaskTest {
// one record completely failed (converter issues), and thus was skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
PowerMock.verifyAll();
verify(workerSourceTask, times(3)).isStopping();
verify(workerSourceTask).commitOffsets();
verify(offsetStore).start();
verify(sourceTask).initialize(any());
verify(sourceTask).start(any());
verify(sourceTask, times(2)).poll();
verify(producer, times(2)).send(any(), any());
}
private void assertSinkMetricValue(String name, double expected) {
@ -482,6 +456,13 @@ public class ErrorHandlingTaskTest {
assertEquals(expected, measured, 0.001d);
}
private void verifyInitializeSink() {
verify(sinkTask).start(TASK_PROPS);
verify(sinkTask).initialize(any(WorkerSinkTaskContext.class));
verify(consumer).subscribe(eq(singletonList(TOPIC)),
any(ConsumerRebalanceListener.class));
}
private void assertSourceMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup = workerSourceTask.sourceTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
@ -494,73 +475,22 @@ public class ErrorHandlingTaskTest {
assertEquals(expected, measured, 0.001d);
}
private void expectInitializeTask() {
consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(TASK_PROPS);
PowerMock.expectLastCall();
}
private void expectTaskGetTopic(boolean anyTimes) {
final Capture<String> connectorCapture = EasyMock.newCapture();
final Capture<String> topicCapture = EasyMock.newCapture();
IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
EasyMock.capture(connectorCapture),
EasyMock.capture(topicCapture)));
if (anyTimes) {
expect.andStubAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
} else {
expect.andAnswer(() -> new TopicStatus(
topicCapture.getValue(),
new ConnectorTaskId(connectorCapture.getValue(), 0),
Time.SYSTEM.milliseconds()));
}
if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
assertEquals("job", connectorCapture.getValue());
assertEquals(TOPIC, topicCapture.getValue());
}
}
private void expectClose() {
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
admin.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
offsetReader.close();
EasyMock.expectLastCall();
offsetStore.stop();
EasyMock.expectLastCall();
try {
headerConverter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
EasyMock.expectLastCall();
private void verifyCloseSource() throws IOException {
verify(producer).close(any(Duration.class));
verify(admin).close(any(Duration.class));
verify(offsetReader).close();
verify(offsetStore).stop();
// headerConverter.close() can throw IOException
verify(headerConverter).close();
}
private void expectTopicCreation(String topic) {
if (workerConfig.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
if (enableTopicCreation) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
} else {
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
}
if (enableTopicCreation) {
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(response);
}
}
@ -571,13 +501,15 @@ public class ErrorHandlingTaskTest {
oo.put("schemas.enable", "false");
converter.configure(oo);
TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
TransformationChain<SinkRecord> sinkTransforms =
new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter, errorHandlingMetrics,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter, statusBackingStore);
retryWithToleranceOperator, workerErrantRecordReporter,
statusBackingStore);
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
@ -602,13 +534,16 @@ public class ErrorHandlingTaskTest {
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator);
workerSourceTask = PowerMock.createPartialMock(
WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
taskId, sourceTask, statusListener, initialState, converter, converter, errorHandlingMetrics, headerConverter, sourceTransforms,
producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run);
workerSourceTask = spy(new WorkerSourceTask(
taskId, sourceTask, statusListener, initialState, converter,
converter, errorHandlingMetrics, headerConverter,
sourceTransforms, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run));
}
private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {