KAFKA-14683 Cleanup WorkerSinkTaskTest (#15506)

1) Rename WorkerSinkTaskMockitoTest back to WorkerSinkTaskTest
2) Tidy up the code a bit
3) rewrite "fail" by "assertThrow"

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Hector Geraldino 2024-03-14 15:50:57 -04:00 committed by GitHub
parent 37212bb242
commit 178761eb36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 56 deletions

View File

@ -420,8 +420,7 @@ subprojects {
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) { if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) {
testsToExclude.addAll([ testsToExclude.addAll([
// connect tests // connect tests
"**/KafkaConfigBackingStoreTest.*", "**/KafkaConfigBackingStoreTest.*"
"**/WorkerSinkTaskTest.*"
]) ])
} }

View File

@ -96,7 +96,7 @@
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/> files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|WorkerSinkTaskMockitoTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/> files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator).java"/> files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator).java"/>

View File

@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyMap;
@ -111,7 +110,7 @@ import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing; import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.StrictStubs.class) @RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerSinkTaskMockitoTest { public class WorkerSinkTaskTest {
// These are fixed to keep this code simpler. In this example we assume byte[] raw values // These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Connect // with mix of integer/string in Connect
private static final String TOPIC = "test"; private static final String TOPIC = "test";
@ -316,9 +315,7 @@ public class WorkerSinkTaskMockitoTest {
// And unpause // And unpause
verify(statusListener).onResume(taskId); verify(statusListener).onResume(taskId);
verify(consumer, times(2)).wakeup(); verify(consumer, times(2)).wakeup();
INITIAL_ASSIGNMENT.forEach(tp -> { INITIAL_ASSIGNMENT.forEach(tp -> verify(consumer).resume(singleton(tp)));
verify(consumer).resume(singleton(tp));
});
verify(sinkTask, times(4)).put(anyList()); verify(sinkTask, times(4)).put(anyList());
} }
@ -419,9 +416,7 @@ public class WorkerSinkTaskMockitoTest {
time.sleep(30000L); time.sleep(30000L);
verify(sinkTask, times(3)).put(anyList()); verify(sinkTask, times(3)).put(anyList());
INITIAL_ASSIGNMENT.forEach(tp -> { INITIAL_ASSIGNMENT.forEach(tp -> verify(consumer).resume(Collections.singleton(tp)));
verify(consumer).resume(Collections.singleton(tp));
});
assertSinkMetricValue("sink-record-read-total", 1.0); assertSinkMetricValue("sink-record-read-total", 1.0);
assertSinkMetricValue("sink-record-send-total", 1.0); assertSinkMetricValue("sink-record-send-total", 1.0);
@ -528,9 +523,7 @@ public class WorkerSinkTaskMockitoTest {
verify(sinkTask).close(INITIAL_ASSIGNMENT); verify(sinkTask).close(INITIAL_ASSIGNMENT);
// All partitions are resumed, as all previously paused-for-redelivery partitions were revoked // All partitions are resumed, as all previously paused-for-redelivery partitions were revoked
newAssignment.forEach(tp -> { newAssignment.forEach(tp -> verify(consumer).resume(Collections.singleton(tp)));
verify(consumer).resume(Collections.singleton(tp));
});
} }
@Test @Test
@ -553,12 +546,8 @@ public class WorkerSinkTaskMockitoTest {
workerTask.iteration(); workerTask.iteration();
verifyPollInitialAssignment(); verifyPollInitialAssignment();
try { RuntimeException thrownException = assertThrows(RuntimeException.class, () -> workerTask.iteration());
workerTask.iteration(); assertEquals(exception, thrownException);
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
}
} }
@Test @Test
@ -580,12 +569,9 @@ public class WorkerSinkTaskMockitoTest {
workerTask.iteration(); workerTask.iteration();
verifyPollInitialAssignment(); verifyPollInitialAssignment();
try {
workerTask.iteration(); RuntimeException thrownException = assertThrows(RuntimeException.class, () -> workerTask.iteration());
fail("Poll should have raised the rebalance exception"); assertEquals(exception, thrownException);
} catch (RuntimeException e) {
assertEquals(exception, e);
}
} }
@Test @Test
@ -608,11 +594,10 @@ public class WorkerSinkTaskMockitoTest {
verifyPollInitialAssignment(); verifyPollInitialAssignment();
expectRebalanceAssignmentError(exception); expectRebalanceAssignmentError(exception);
try { try {
workerTask.iteration(); RuntimeException thrownException = assertThrows(RuntimeException.class, () -> workerTask.iteration());
fail("Poll should have raised the rebalance exception"); assertEquals(exception, thrownException);
} catch (RuntimeException e) {
assertEquals(exception, e);
} finally { } finally {
verify(sinkTask).close(INITIAL_ASSIGNMENT); verify(sinkTask).close(INITIAL_ASSIGNMENT);
} }
@ -798,7 +783,7 @@ public class WorkerSinkTaskMockitoTest {
doThrow(new WakeupException()) doThrow(new WakeupException())
// and succeed the second time // and succeed the second time
.doNothing() .doNothing()
.when(consumer).commitSync(eq(offsets)); .when(consumer).commitSync(offsets);
workerTask.iteration(); // first record delivered workerTask.iteration(); // first record delivered
@ -808,9 +793,7 @@ public class WorkerSinkTaskMockitoTest {
verify(sinkTask).close(INITIAL_ASSIGNMENT); verify(sinkTask).close(INITIAL_ASSIGNMENT);
verify(sinkTask, times(2)).open(INITIAL_ASSIGNMENT); verify(sinkTask, times(2)).open(INITIAL_ASSIGNMENT);
INITIAL_ASSIGNMENT.forEach(tp -> { INITIAL_ASSIGNMENT.forEach(tp -> verify(consumer).resume(Collections.singleton(tp)));
verify(consumer).resume(Collections.singleton(tp));
});
verify(statusListener).onResume(taskId); verify(statusListener).onResume(taskId);
@ -862,7 +845,7 @@ public class WorkerSinkTaskMockitoTest {
doThrow(new WakeupException()) doThrow(new WakeupException())
// and succeed the second time // and succeed the second time
.doNothing() .doNothing()
.when(consumer).commitSync(eq(offsets)); .when(consumer).commitSync(offsets);
workerTask.execute(); workerTask.execute();
@ -1091,7 +1074,7 @@ public class WorkerSinkTaskMockitoTest {
// Test that the commitTimeoutMs timestamp is correctly computed and checked in WorkerSinkTask.iteration() // Test that the commitTimeoutMs timestamp is correctly computed and checked in WorkerSinkTask.iteration()
// when there is a long running commit in process. See KAFKA-4942 for more information. // when there is a long running commit in process. See KAFKA-4942 for more information.
@Test @Test
public void testLongRunningCommitWithoutTimeout() throws InterruptedException { public void testLongRunningCommitWithoutTimeout() {
createTask(initialState); createTask(initialState);
workerTask.initialize(TASK_CONFIG); workerTask.initialize(TASK_CONFIG);
@ -1172,12 +1155,8 @@ public class WorkerSinkTaskMockitoTest {
// Throw another exception while closing the task's assignment // Throw another exception while closing the task's assignment
doThrow(closeException).when(sinkTask).close(any(Collection.class)); doThrow(closeException).when(sinkTask).close(any(Collection.class));
try { RuntimeException thrownException = assertThrows(RuntimeException.class, () -> workerTask.execute());
workerTask.execute(); assertEquals(closeException, thrownException);
fail("workerTask.execute should have thrown an exception");
} catch (RuntimeException e) {
assertSame("Exception from close should propagate as-is", closeException, e);
}
verify(consumer).wakeup(); verify(consumer).wakeup();
} }
@ -1215,17 +1194,13 @@ public class WorkerSinkTaskMockitoTest {
workerTask.initialize(TASK_CONFIG); workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart(); workerTask.initializeAndStart();
try {
workerTask.execute(); RuntimeException thrownException = assertThrows(ConnectException.class, () -> workerTask.execute());
fail("workerTask.execute should have thrown an exception"); assertEquals("Exception from put should be the cause", putException, thrownException.getCause());
} catch (ConnectException e) { assertTrue("Exception from close should be suppressed", thrownException.getSuppressed().length > 0);
assertSame("Exception from put should be the cause", putException, e.getCause()); assertEquals(closeException, thrownException.getSuppressed()[0]);
assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0);
assertSame(closeException, e.getSuppressed()[0]);
}
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testTaskCancelPreventsFinalOffsetCommit() { public void testTaskCancelPreventsFinalOffsetCommit() {
createTask(initialState); createTask(initialState);
@ -1287,10 +1262,6 @@ public class WorkerSinkTaskMockitoTest {
expectTaskGetTopic(); expectTaskGetTopic();
expectConversionAndTransformation(null, new RecordHeaders()); expectConversionAndTransformation(null, new RecordHeaders());
final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));