KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest (#12472)

Reviewers: Divij Vaidya <diviv@amazon.com>, Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
Yash Mayya 2022-08-09 19:40:47 +05:30 committed by GitHub
parent 36bfc3a254
commit 465d8fa94f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 168 additions and 370 deletions

View File

@ -26,28 +26,36 @@ import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext; import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.easymock.Capture;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(EasyMockRunner.class) @RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerConnectorTest extends EasyMockSupport { public class WorkerConnectorTest {
private static final String VERSION = "1.1"; private static final String VERSION = "1.1";
public static final String CONNECTOR = "connector"; public static final String CONNECTOR = "connector";
@ -60,15 +68,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
public ConnectorConfig connectorConfig; public ConnectorConfig connectorConfig;
public MockConnectMetrics metrics; public MockConnectMetrics metrics;
@Mock Plugins plugins; @Mock private Plugins plugins;
@Mock SourceConnector sourceConnector; @Mock private SourceConnector sourceConnector;
@Mock SinkConnector sinkConnector; @Mock private SinkConnector sinkConnector;
@Mock Connector connector; @Mock private CloseableConnectorContext ctx;
@Mock CloseableConnectorContext ctx; @Mock private ConnectorStatus.Listener listener;
@Mock ConnectorStatus.Listener listener; @Mock private CloseableOffsetStorageReader offsetStorageReader;
@Mock CloseableOffsetStorageReader offsetStorageReader; @Mock private ConnectorOffsetBackingStore offsetStore;
@Mock ConnectorOffsetBackingStore offsetStore; @Mock private ClassLoader classLoader;
@Mock ClassLoader classLoader; private Connector connector;
@Before @Before
public void setup() { public void setup() {
@ -86,31 +94,8 @@ public class WorkerConnectorTest extends EasyMockSupport {
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
connector = sourceConnector; connector = sourceConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION); doThrow(exception).when(connector).initialize(any());
offsetStore.start();
expectLastCall();
connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall().andThrow(exception);
listener.onFailure(CONNECTOR, exception);
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
@ -120,7 +105,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
verifyCleanShutdown(false);
} }
@Test @Test
@ -128,36 +115,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
connector = sinkConnector; connector = sinkConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION); doThrow(exception).when(connector).initialize(any());
connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); Callback<TargetState> onStateChange = mockCallback();
expectLastCall().andThrow(exception); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
listener.onFailure(CONNECTOR, exception);
expectLastCall();
// expect no call to onStartup() after failure
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
assertFailedMetric(workerConnector); assertFailedMetric(workerConnector);
@ -167,48 +129,22 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
// expect no call to onStartup() after failure
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(any(Exception.class), isNull());
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testStartupAndShutdown() { public void testStartupAndShutdown() {
connector = sourceConnector; connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
offsetStore.start(); when(connector.version()).thenReturn(VERSION);
expectLastCall();
connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
expectLastCall();
listener.onStartup(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
replayAll();
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
@ -219,54 +155,26 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verifyCleanShutdown(true);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testStartupAndPause() { public void testStartupAndPause() {
connector = sinkConnector; connector = sinkConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION);
connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); Callback<TargetState> onStateChange = mockCallback();
expectLastCall(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
connector.start(CONFIG);
expectLastCall();
listener.onStartup(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall();
listener.onPause(CONNECTOR);
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
assertInitializedSinkMetric(workerConnector); assertInitializedSinkMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange); workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector); assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange); workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
@ -275,52 +183,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testOnResume() { public void testOnResume() {
connector = sourceConnector; connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); when(connector.version()).thenReturn(VERSION);
expectLastCall();
offsetStore.start(); Callback<TargetState> onStateChange = mockCallback();
expectLastCall();
listener.onPause(CONNECTOR);
expectLastCall();
connector.start(CONFIG);
expectLastCall();
listener.onResume(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall();
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
@ -334,42 +215,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(listener).onPause(CONNECTOR);
verify(connector).start(CONFIG);
verify(listener).onResume(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testStartupPaused() { public void testStartupPaused() {
connector = sinkConnector; connector = sinkConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION);
connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); Callback<TargetState> onStateChange = mockCallback();
expectLastCall(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
// connector never gets started
listener.onPause(CONNECTOR);
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
assertInitializedSinkMetric(workerConnector); assertInitializedSinkMetric(workerConnector);
@ -379,45 +243,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
// connector never gets started
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testStartupFailure() { public void testStartupFailure() {
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
connector = sinkConnector; connector = sinkConnector;
connector.version();
expectLastCall().andReturn(VERSION);
connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); when(connector.version()).thenReturn(VERSION);
expectLastCall(); doThrow(exception).when(connector).start(CONFIG);
connector.start(CONFIG); Callback<TargetState> onStateChange = mockCallback();
expectLastCall().andThrow(exception); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
listener.onFailure(CONNECTOR, exception);
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
assertInitializedSinkMetric(workerConnector); assertInitializedSinkMetric(workerConnector);
@ -427,7 +271,13 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onFailure(CONNECTOR, exception);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(any(Exception.class), isNull());
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
@ -435,42 +285,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
connector = sourceConnector; connector = sourceConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION);
offsetStore.start(); doThrow(exception).when(connector).stop();
expectLastCall();
connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
expectLastCall();
listener.onStartup(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall().andThrow(exception);
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
listener.onFailure(CONNECTOR, exception);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
replayAll();
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
@ -481,48 +300,22 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertFailedMetric(workerConnector); assertFailedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
verify(listener).onFailure(CONNECTOR, exception);
verifyShutdown(false, true);
} }
@Test @Test
public void testTransitionStartedToStarted() { public void testTransitionStartedToStarted() {
connector = sourceConnector; connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
offsetStore.start(); when(connector.version()).thenReturn(VERSION);
expectLastCall();
connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); Callback<TargetState> onStateChange = mockCallback();
expectLastCall();
connector.start(CONFIG);
expectLastCall();
// expect only one call to onStartup()
listener.onStartup(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall().times(2);
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
@ -536,53 +329,21 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
// expect only one call to onStartup()
verify(listener).onStartup(CONNECTOR);
verifyCleanShutdown(true);
verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testTransitionPausedToPaused() { public void testTransitionPausedToPaused() {
connector = sourceConnector; connector = sourceConnector;
connector.version(); when(connector.version()).thenReturn(VERSION);
expectLastCall().andReturn(VERSION);
offsetStore.start();
expectLastCall();
connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
expectLastCall();
listener.onStartup(CONNECTOR);
expectLastCall();
connector.stop();
expectLastCall();
listener.onPause(CONNECTOR);
expectLastCall();
listener.onShutdown(CONNECTOR);
expectLastCall();
ctx.close();
expectLastCall();
offsetStorageReader.close();
expectLastCall();
offsetStore.stop();
expectLastCall();
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall().times(2);
replayAll();
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
@ -597,28 +358,32 @@ public class WorkerConnectorTest extends EasyMockSupport {
workerConnector.doShutdown(); workerConnector.doShutdown();
assertStoppedMetric(workerConnector); assertStoppedMetric(workerConnector);
verifyAll(); verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
} }
@Test @Test
public void testFailConnectorThatIsNeitherSourceNorSink() { public void testFailConnectorThatIsNeitherSourceNorSink() {
connector.version(); connector = mock(Connector.class);
expectLastCall().andReturn(VERSION); when(connector.version()).thenReturn(VERSION);
Capture<Throwable> exceptionCapture = Capture.newInstance();
listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture));
expectLastCall();
replayAll();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize(); workerConnector.initialize();
verify(connector).version();
ArgumentCaptor<Throwable> exceptionCapture = ArgumentCaptor.forClass(Throwable.class);
verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture());
Throwable e = exceptionCapture.getValue(); Throwable e = exceptionCapture.getValue();
assertTrue(e instanceof ConnectException); assertTrue(e instanceof ConnectException);
assertTrue(e.getMessage().contains("must be a subclass of")); assertTrue(e.getMessage().contains("must be a subclass of"));
verifyAll();
} }
protected void assertFailedMetric(WorkerConnector workerConnector) { protected void assertFailedMetric(WorkerConnector workerConnector) {
@ -672,6 +437,39 @@ public class WorkerConnectorTest extends EasyMockSupport {
assertEquals(VERSION, version); assertEquals(VERSION, version);
} }
@SuppressWarnings("unchecked")
private Callback<TargetState> mockCallback() {
return mock(Callback.class);
}
private void verifyInitialize() {
verify(connector).version();
if (connector instanceof SourceConnector) {
verify(offsetStore).start();
verify(connector).initialize(any(SourceConnectorContext.class));
} else {
verify(connector).initialize(any(SinkConnectorContext.class));
}
}
private void verifyCleanShutdown(boolean started) {
verifyShutdown(true, started);
}
private void verifyShutdown(boolean clean, boolean started) {
verify(ctx).close();
if (connector instanceof SourceConnector) {
verify(offsetStorageReader).close();
verify(offsetStore).stop();
}
if (clean) {
verify(listener).onShutdown(CONNECTOR);
}
if (started) {
verify(connector).stop();
}
}
private static abstract class TestConnector extends Connector { private static abstract class TestConnector extends Connector {
} }
} }