KAFKA-18073: Prevent dropped records from failed retriable exceptions (#18146)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Thomas Thornton 2025-01-09 13:13:11 -05:00 committed by GitHub
parent 5acbd42dd7
commit b35c29401a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 340 additions and 31 deletions

View File

@ -214,8 +214,7 @@ public class RetryWithToleranceOperator<T> implements AutoCloseable {
errorHandlingMetrics.recordRetry(); errorHandlingMetrics.recordRetry();
} else { } else {
log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline); log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
context.error(e); throw e;
return null;
} }
if (stopping) { if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation as failed."); log.trace("Shutdown has been scheduled. Marking operation as failed.");

View File

@ -80,6 +80,7 @@ import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
@ -362,8 +363,8 @@ public class AbstractWorkerSourceTaskTest {
StringConverter stringConverter = new StringConverter(); StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noopOperator(), createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noneOperator(),
Collections::emptyList); Collections::emptyList, transformationChain);
expectSendRecord(null); expectSendRecord(null);
expectApplyTransformationChain(); expectApplyTransformationChain();
@ -706,6 +707,118 @@ public class AbstractWorkerSourceTaskTest {
verify(transformationChain, times(2)).apply(any(), eq(record3)); verify(transformationChain, times(2)).apply(any(), eq(record3));
} }
@Test
public void testSendRecordsFailedTransformationErrorToleranceNone() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
TransformationChain<RetriableException, SourceRecord> transformationChainRetriableException =
WorkerTestUtils.getTransformationChain(retryWithToleranceOperator, List.of(new RetriableException("Test"), record1));
createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator);
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));
workerTask.toSend = Arrays.asList(record1);
// The transformation errored out so the error should be re-raised by sendRecords with error tolerance None
Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(exception.getMessage().contains("Tolerance exceeded"));
// Ensure the transformation was called
verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1));
// The second transform call will succeed, batch should succeed at sending the one record (none were skipped)
assertTrue(workerTask.sendRecords());
verifySendRecord(1);
}
@Test
public void testSendRecordsFailedTransformationErrorToleranceAll() {
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
TransformationChain<RetriableException, SourceRecord> transformationChainRetriableException = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
List.of(new RetriableException("Test")));
createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator);
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
workerTask.toSend = Arrays.asList(record1);
// The transformation errored out so the error should be ignored & the record skipped with error tolerance all
assertTrue(workerTask.sendRecords());
// Ensure the transformation was called
verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1));
}
@Test
public void testSendRecordsConversionExceptionErrorToleranceNone() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
List<Object> results = Stream.of(record1, record2, record3)
.collect(Collectors.toList());
TransformationChain<RetriableException, SourceRecord> chain = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
results);
createWorkerTask(chain, retryWithToleranceOperator);
// When we try to convert the key/value of each record, throw an exception
throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));
workerTask.toSend = Arrays.asList(record1, record2, record3);
// Send records should fail when errors.tolerance is none and the conversion call fails
Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(exception.getMessage().contains("Tolerance exceeded"));
assertThrows(ConnectException.class, workerTask::sendRecords);
assertThrows(ConnectException.class, workerTask::sendRecords);
// Set the conversion call to succeed, batch should succeed at sending all three records (none were skipped)
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
assertTrue(workerTask.sendRecords());
verifySendRecord(3);
}
@Test
public void testSendRecordsConversionExceptionErrorToleranceAll() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
List<Object> results = Stream.of(record1, record2, record3)
.collect(Collectors.toList());
TransformationChain<RetriableException, SourceRecord> chain = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
results);
createWorkerTask(chain, retryWithToleranceOperator);
// When we try to convert the key/value of each record, throw an exception
throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
workerTask.toSend = Arrays.asList(record1, record2, record3);
// With errors.tolerance to all, the faiiled conversion should simply skip the record, and record successful batch
assertTrue(workerTask.sendRecords());
}
private void expectSendRecord(Headers headers) { private void expectSendRecord(Headers headers) {
if (headers != null) if (headers != null)
expectConvertHeadersAndKeyValue(headers, TOPIC); expectConvertHeadersAndKeyValue(headers, TOPIC);
@ -806,6 +919,20 @@ public class AbstractWorkerSourceTaskTest {
assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD); assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD);
} }
private void throwExceptionWhenConvertKey(Headers headers, String topic) {
if (headers.iterator().hasNext()) {
when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA),
anyString()))
.thenAnswer((Answer<byte[]>) invocation -> {
String headerValue = invocation.getArgument(3, String.class);
return headerValue.getBytes(StandardCharsets.UTF_8);
});
}
when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY)))
.thenThrow(new RetriableException("Failed to convert key"));
}
private void expectApplyTransformationChain() { private void expectApplyTransformationChain() {
when(transformationChain.apply(any(), any(SourceRecord.class))) when(transformationChain.apply(any(), any(SourceRecord.class)))
.thenAnswer(AdditionalAnswers.returnsSecondArg()); .thenAnswer(AdditionalAnswers.returnsSecondArg());
@ -817,12 +944,19 @@ public class AbstractWorkerSourceTaskTest {
return new RecordHeaders(); return new RecordHeaders();
} }
private void createWorkerTask(TransformationChain transformationChain, RetryWithToleranceOperator toleranceOperator) {
createWorkerTask(keyConverter, valueConverter, headerConverter, toleranceOperator, Collections::emptyList,
transformationChain);
}
private void createWorkerTask() { private void createWorkerTask() {
createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList); createWorkerTask(
keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, transformationChain);
} }
private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) { RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
TransformationChain transformationChain) {
workerTask = new AbstractWorkerSourceTask( workerTask = new AbstractWorkerSourceTask(
taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain, taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain,
sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,

View File

@ -278,7 +278,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noopOperator(), statusBackingStore, config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList); sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList);
} }

View File

@ -90,6 +90,7 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static org.apache.kafka.connect.runtime.WorkerTestUtils.getTransformationChain;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -192,13 +193,18 @@ public class WorkerSinkTaskTest {
createTask(initialState, keyConverter, valueConverter, headerConverter); createTask(initialState, keyConverter, valueConverter, headerConverter);
} }
private void createTask(TargetState initialState, TransformationChain transformationChain, RetryWithToleranceOperator toleranceOperator) {
createTask(initialState, keyConverter, valueConverter, headerConverter, toleranceOperator, Collections::emptyList, transformationChain);
}
private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList); createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, transformationChain);
} }
private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter,
RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator, RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator,
Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> errorReportersSupplier) { Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> errorReportersSupplier,
TransformationChain transformationChain) {
workerTask = new WorkerSinkTask( workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics, headerConverter, keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
@ -854,6 +860,103 @@ public class WorkerSinkTaskTest {
verify(sinkTask).close(any(Collection.class)); verify(sinkTask).close(any(Collection.class));
} }
@Test
public void testRaisesFailedRetriableExceptionFromConvert() {
createTask(initialState);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1))
.thenAnswer(invocation -> {
// stop the task during its second iteration
workerTask.stop();
return new ConsumerRecords<>(Map.of(), Map.of());
});
throwExceptionOnConversion(null, new RecordHeaders());
workerTask.iteration();
assertThrows(ConnectException.class, workerTask::execute);
}
@Test
public void testSkipsFailedRetriableExceptionFromConvert() {
createTask(initialState, keyConverter, valueConverter, headerConverter,
RetryWithToleranceOperatorTest.allOperator(), Collections::emptyList, transformationChain);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1))
.thenAnswer(invocation -> {
// stop the task during its second iteration
workerTask.stop();
return new ConsumerRecords<>(Map.of(), Map.of());
});
throwExceptionOnConversion(null, new RecordHeaders());
workerTask.iteration();
workerTask.execute();
verify(sinkTask, times(3)).put(Collections.emptyList());
}
@Test
public void testRaisesFailedRetriableExceptionFromTransform() {
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
TransformationChain<RetriableException, SinkRecord> transformationChainRetriableException = getTransformationChain(
retryWithToleranceOperator, List.of(new RetriableException("Test")));
createTask(initialState, transformationChainRetriableException, retryWithToleranceOperator);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1))
.thenAnswer(invocation -> {
// stop the task during its second iteration
workerTask.stop();
return new ConsumerRecords<>(Map.of(), Map.of());
});
expectConversion(null, new RecordHeaders());
workerTask.iteration();
assertThrows(ConnectException.class, workerTask::execute);
}
@Test
public void testSkipsFailedRetriableExceptionFromTransform() {
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
TransformationChain<RetriableException, SinkRecord> transformationChainRetriableException = getTransformationChain(
retryWithToleranceOperator, List.of(new RetriableException("Test")));
createTask(initialState, transformationChainRetriableException, retryWithToleranceOperator);
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
verifyInitializeTask();
expectPollInitialAssignment()
.thenAnswer(expectConsumerPoll(1))
.thenAnswer(invocation -> {
// stop the task during its second iteration
workerTask.stop();
return new ConsumerRecords<>(Map.of(), Map.of());
});
expectConversion(null, new RecordHeaders());
workerTask.iteration();
workerTask.execute();
verify(sinkTask, times(3)).put(Collections.emptyList());
}
@Test @Test
public void testRequestCommit() { public void testRequestCommit() {
createTask(initialState); createTask(initialState);
@ -1758,7 +1861,7 @@ public class WorkerSinkTaskTest {
taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics, headerConverter, keyConverter, valueConverter, errorHandlingMetrics, headerConverter,
transformationChain, mockConsumer, pluginLoader, time, transformationChain, mockConsumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList); RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore, Collections::emptyList);
mockConsumer.updateBeginningOffsets( mockConsumer.updateBeginningOffsets(
new HashMap<>() {{ new HashMap<>() {{
put(TOPIC_PARTITION, 0L); put(TOPIC_PARTITION, 0L);
@ -1852,6 +1955,19 @@ public class WorkerSinkTaskTest {
expectTransformation(topicPrefix); expectTransformation(topicPrefix);
} }
private void expectConversion(final String topicPrefix, final Headers headers) {
when(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
when(valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
for (Header header : headers) {
when(headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value())));
}
}
private void throwExceptionOnConversion(final String topicPrefix, final Headers headers) {
when(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenThrow(new RetriableException("Failed to convert"));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void expectTransformation(final String topicPrefix) { private void expectTransformation(final String topicPrefix) {
when(transformationChain.apply(any(ProcessingContext.class), any(SinkRecord.class))).thenAnswer((Answer<SinkRecord>) when(transformationChain.apply(any(ProcessingContext.class), any(SinkRecord.class))).thenAnswer((Answer<SinkRecord>)

View File

@ -177,7 +177,7 @@ public class WorkerSinkTaskThreadedTest {
workerTask = new WorkerSinkTask( workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
valueConverter, errorHandlingMetrics, headerConverter, transformationChain, valueConverter, errorHandlingMetrics, headerConverter, transformationChain,
consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
Collections::emptyList); Collections::emptyList);
recordsReturned = 0; recordsReturned = 0;
} }

View File

@ -231,7 +231,7 @@ public class WorkerSourceTaskTest {
} }
private void createWorkerTask() { private void createWorkerTask() {
createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.noopOperator()); createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.noneOperator());
} }
private void createWorkerTaskWithErrorToleration() { private void createWorkerTaskWithErrorToleration() {
@ -239,7 +239,7 @@ public class WorkerSourceTaskTest {
} }
private void createWorkerTask(TargetState initialState) { private void createWorkerTask(TargetState initialState) {
createWorkerTask(initialState, RetryWithToleranceOperatorTest.noopOperator()); createWorkerTask(initialState, RetryWithToleranceOperatorTest.noneOperator());
} }
private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) { private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) {

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,6 +38,9 @@ import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class WorkerTestUtils { public class WorkerTestUtils {
@ -155,4 +165,33 @@ public class WorkerTestUtils {
assertEquals(expectedDelay, assignment.delay(), assertEquals(expectedDelay, assignment.delay(),
"Wrong rebalance delay in " + assignment); "Wrong rebalance delay in " + assignment);
} }
public static <T, R extends ConnectRecord<R>> TransformationChain<T, R> getTransformationChain(
RetryWithToleranceOperator<T> toleranceOperator,
List<Object> results) {
Transformation<R> transformation = mock(Transformation.class);
OngoingStubbing<R> stub = when(transformation.apply(any()));
for (Object result: results) {
if (result instanceof Exception) {
stub = stub.thenThrow((Exception) result);
} else {
stub = stub.thenReturn((R) result);
}
}
return buildTransformationChain(transformation, toleranceOperator);
}
public static <T, R extends ConnectRecord<R>> TransformationChain<T, R> buildTransformationChain(
Transformation<R> transformation,
RetryWithToleranceOperator<T> toleranceOperator) {
Predicate<R> predicate = mock(Predicate.class);
when(predicate.test(any())).thenReturn(true);
TransformationStage<R> stage = new TransformationStage(
predicate,
false,
transformation);
TransformationChain<T, R> realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator);
TransformationChain<T, R> transformationChainRetriableException = Mockito.spy(realTransformationChainRetriableException);
return transformationChainRetriableException;
}
} }

View File

@ -97,10 +97,10 @@ public class RetryWithToleranceOperatorTest {
put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
}}; }};
public static <T> RetryWithToleranceOperator<T> noopOperator() { public static <T> RetryWithToleranceOperator<T> noneOperator() {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics( return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics(
new ConnectorTaskId("noop-connector", -1), new ConnectorTaskId("errors-none-tolerate-connector", -1),
new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES), new ConnectMetrics("errors-none-tolerate-worker", new TestableWorkerConfig(PROPERTIES),
Time.SYSTEM, "test-cluster"))); Time.SYSTEM, "test-cluster")));
} }
@ -147,56 +147,77 @@ public class RetryWithToleranceOperatorTest {
@Test @Test
public void testHandleExceptionInTransformations() { public void testHandleExceptionInTransformations() {
testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception()); testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception(), ALL);
} }
@Test
public void testHandleRetriableExceptionInTransformationsToleranceNone() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TRANSFORMATION, new RetriableException("Test"), NONE));
}
@Test @Test
public void testHandleExceptionInHeaderConverter() { public void testHandleExceptionInHeaderConverter() {
testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception()); testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception(), ALL);
}
@Test
public void testHandleRetriableExceptionInHeaderConverterToleranceNone() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.HEADER_CONVERTER, new RetriableException("Test"), NONE));
} }
@Test @Test
public void testHandleExceptionInValueConverter() { public void testHandleExceptionInValueConverter() {
testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception()); testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception(), ALL);
}
@Test
public void testHandleRetriableExceptionInValueConverterToleranceNone() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.VALUE_CONVERTER, new RetriableException("Test"), NONE));
} }
@Test @Test
public void testHandleExceptionInKeyConverter() { public void testHandleExceptionInKeyConverter() {
testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception()); testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception(), ALL);
}
@Test
public void testHandleRetriableExceptionInKeyConverterToleranceNone() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KEY_CONVERTER, new RetriableException("Test"), NONE));
} }
@Test @Test
public void testHandleExceptionInTaskPut() { public void testHandleExceptionInTaskPut() {
testHandleExceptionInStage(Stage.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test")); testHandleExceptionInStage(Stage.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
} }
@Test @Test
public void testHandleExceptionInTaskPoll() { public void testHandleExceptionInTaskPoll() {
testHandleExceptionInStage(Stage.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test")); testHandleExceptionInStage(Stage.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
} }
@Test @Test
public void testThrowExceptionInTaskPut() { public void testThrowExceptionInTaskPut() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_PUT, new Exception())); assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_PUT, new Exception(), ALL));
} }
@Test @Test
public void testThrowExceptionInTaskPoll() { public void testThrowExceptionInTaskPoll() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_POLL, new Exception())); assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_POLL, new Exception(), ALL));
} }
@Test @Test
public void testThrowExceptionInKafkaConsume() { public void testThrowExceptionInKafkaConsume() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception())); assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception(), ALL));
} }
@Test @Test
public void testThrowExceptionInKafkaProduce() { public void testThrowExceptionInKafkaProduce() {
assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception())); assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception(), ALL));
} }
private void testHandleExceptionInStage(Stage type, Exception ex) { private void testHandleExceptionInStage(Stage type, Exception ex, ToleranceType toleranceType) {
RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator = setupExecutor(); RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator = setupExecutor(toleranceType);
ProcessingContext<ConsumerRecord<byte[], byte[]>> context = new ProcessingContext<>(consumerRecord); ProcessingContext<ConsumerRecord<byte[], byte[]>> context = new ProcessingContext<>(consumerRecord);
Operation<?> exceptionThrower = () -> { Operation<?> exceptionThrower = () -> {
throw ex; throw ex;
@ -205,8 +226,8 @@ public class RetryWithToleranceOperatorTest {
assertTrue(context.failed()); assertTrue(context.failed());
} }
private <T> RetryWithToleranceOperator<T> setupExecutor() { private <T> RetryWithToleranceOperator<T> setupExecutor(ToleranceType toleranceType) {
return genericOperator(0, ALL, errorHandlingMetrics); return genericOperator(0, toleranceType, errorHandlingMetrics);
} }
@Test @Test