KAFKA-7273: Extend Connect Converter to support headers (#6362)

Implemented KIP-440 to allow Connect converters to use record headers when serializing or deserializing keys and values. This change is backward compatible in that the new methods default to calling the older existing methods, so existing Converter implementations need not be changed. This changes the WorkerSinkTask and WorkerSourceTask to use the new converter methods, but Connect's existing Converter implementations and the use of converters for internal topics are intentionally not modified. Added unit tests.

Author: Yaroslav Tkachenko <sapiensy@gmail.com>
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Yaroslav Tkachenko 2019-09-25 09:23:03 -07:00 committed by Randall Hauch
parent 92688ef82c
commit 70d1bb40d9
8 changed files with 384 additions and 24 deletions

View File

@ -141,6 +141,9 @@
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="(WorkerSinkTask|WorkerSourceTask)Test.java"/>
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -44,6 +45,23 @@ public interface Converter {
*/
byte[] fromConnectData(String topic, Schema schema, Object value);
/**
* Convert a Kafka Connect data object to a native object for serialization,
* potentially using the supplied topic and headers in the record as necessary.
*
* <p>Connect uses this method directly, and for backward compatibility reasons this method
* by default will call the {@link #fromConnectData(String, Schema, Object)} method.
* Override this method to make use of the supplied headers.</p>
* @param topic the topic associated with the data
* @param headers the headers associated with the data
* @param schema the schema for the value
* @param value the value to convert
* @return the serialized value
*/
default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}
/**
* Convert a native object to a Kafka Connect data object.
* @param topic the topic associated with the data
@ -51,4 +69,20 @@ public interface Converter {
* @return an object containing the {@link Schema} and the converted value
*/
SchemaAndValue toConnectData(String topic, byte[] value);
/**
* Convert a native object to a Kafka Connect data object,
* potentially using the supplied topic and headers in the record as necessary.
*
* <p>Connect uses this method directly, and for backward compatibility reasons this method
* by default will call the {@link #toConnectData(String, byte[])} method.
* Override this method to make use of the supplied headers.</p>
* @param topic the topic associated with the data
* @param headers the headers associated with the data
* @param value the value to convert
* @return an object containing the {@link Schema} and the converted value
*/
default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return toConnectData(topic, value);
}
}

View File

@ -481,10 +481,10 @@ class WorkerSinkTask extends WorkerTask {
}
private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]> msg) {
SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(msg.topic(), msg.key()),
SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key()),
Stage.KEY_CONVERTER, keyConverter.getClass());
SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(msg.topic(), msg.value()),
SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value()),
Stage.VALUE_CONVERTER, valueConverter.getClass());
Headers headers = retryWithToleranceOperator.execute(() -> convertHeadersFor(msg), Stage.HEADER_CONVERTER, headerConverter.getClass());

View File

@ -278,10 +278,10 @@ class WorkerSourceTask extends WorkerTask {
RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()),
byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
Stage.KEY_CONVERTER, keyConverter.getClass());
byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()),
byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
Stage.VALUE_CONVERTER, valueConverter.getClass());
if (retryWithToleranceOperator.failed()) {

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.storage.Converter;
/**
* This is a simple Converter implementation that uses "encoding" header to encode/decode strings via provided charset name
*/
public class TestConverterWithHeaders implements Converter {
private static final String HEADER_ENCODING = "encoding";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
String encoding = extractEncoding(headers);
try {
return new SchemaAndValue(Schema.STRING_SCHEMA, new String(value, encoding));
} catch (UnsupportedEncodingException e) {
throw new DataException("Unsupported encoding: " + encoding, e);
}
}
@Override
public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
String encoding = extractEncoding(headers);
try {
return ((String) value).getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new DataException("Unsupported encoding: " + encoding, e);
}
}
private String extractEncoding(Headers headers) {
Header header = headers.lastHeader(HEADER_ENCODING);
if (header == null) {
throw new DataException("Header '" + HEADER_ENCODING + "' is required!");
}
return new String(header.value());
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new DataException("Headers are required for this converter!");
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
throw new DataException("Headers are required for this converter!");
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -25,6 +27,9 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
@ -42,6 +47,7 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.CaptureType;
@ -162,6 +168,10 @@ public class WorkerSinkTaskTest {
}
private void createTask(TargetState initialState) {
createTask(initialState, keyConverter, valueConverter, headerConverter);
}
private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, headerConverter,
@ -1264,6 +1274,85 @@ public class WorkerSinkTaskTest {
assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
}
@Test
public void testHeaders() throws Exception {
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
createTask(initialState);
expectInitializeTask();
expectPollInitialAssignment();
expectConsumerPoll(1, headers);
expectConversionAndTransformation(1, null, headers);
sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
PowerMock.verifyAll();
}
@Test
public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
TestConverterWithHeaders testConverter = new TestConverterWithHeaders();
createTask(initialState, stringConverter, testConverter, stringConverter);
expectInitializeTask();
expectPollInitialAssignment();
String keyA = "a";
String valueA = "Árvíztűrő tükörfúrógép";
Headers headersA = new RecordHeaders();
String encodingA = "latin2";
headersA.add("encoding", encodingA.getBytes());
String keyB = "b";
String valueB = "Тестовое сообщение";
Headers headersB = new RecordHeaders();
String encodingB = "koi8_r";
headersB.add("encoding", encodingB.getBytes());
expectConsumerPoll(Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0L, 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA),
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
0L, 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB)
));
expectTransformation(2, null);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // iter 1 -- initial assignment
workerTask.iteration(); // iter 2 -- deliver 1 record
Iterator<SinkRecord> iterator = records.getValue().iterator();
SinkRecord recordA = iterator.next();
assertEquals(keyA, recordA.key());
assertEquals(valueA, (String) recordA.value());
SinkRecord recordB = iterator.next();
assertEquals(keyB, recordB.key());
assertEquals(valueB, (String) recordB.value());
PowerMock.verifyAll();
}
private void expectInitializeTask() throws Exception {
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
@ -1346,17 +1435,25 @@ public class WorkerSinkTaskTest {
}
private void expectConsumerPoll(final int numMessages) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, Headers headers) {
expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, headers);
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
expectConsumerPoll(numMessages, timestamp, timestampType, emptyHeaders());
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType, Headers headers) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE, headers));
recordsReturnedTp1 += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
@ -1367,14 +1464,40 @@ public class WorkerSinkTaskTest {
});
}
private void expectConsumerPoll(List<ConsumerRecord<byte[], byte[]>> records) {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
return new ConsumerRecords<>(
records.isEmpty() ?
Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap() :
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)
);
}
});
}
private void expectConversionAndTransformation(final int numMessages) {
expectConversionAndTransformation(numMessages, null);
}
private void expectConversionAndTransformation(final int numMessages, final String topicPrefix) {
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
expectConversionAndTransformation(numMessages, topicPrefix, emptyHeaders());
}
private void expectConversionAndTransformation(final int numMessages, final String topicPrefix, final Headers headers) {
EasyMock.expect(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
EasyMock.expect(valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
for (Header header : headers) {
EasyMock.expect(headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).andReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value()))).times(1);
}
expectTransformation(numMessages, topicPrefix);
}
private void expectTransformation(final int numMessages, final String topicPrefix) {
final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)))
.andAnswer(new IAnswer<SinkRecord>() {
@ -1389,7 +1512,8 @@ public class WorkerSinkTaskTest {
origRecord.key(),
origRecord.valueSchema(),
origRecord.value(),
origRecord.timestamp()
origRecord.timestamp(),
origRecord.headers()
)
: origRecord;
}
@ -1472,6 +1596,10 @@ public class WorkerSinkTaskTest {
double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total");
}
private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}
private abstract static class TestSinkTask extends SinkTask {
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
@ -572,8 +573,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
return records;
}
});
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(
@ -606,8 +607,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
return records;
}
});
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
sinkTask.put(EasyMock.anyObject(Collection.class));
return EasyMock.expectLastCall();
}
@ -651,8 +652,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
consumer.seek(TOPIC_PARTITION, startOffset);
EasyMock.expectLastCall();
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
sinkTask.put(EasyMock.anyObject(Collection.class));
return EasyMock.expectLastCall();
}
@ -694,6 +695,10 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
return capturedCallback;
}
private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}
private static abstract class TestSinkTask extends SinkTask {
}
}

View File

@ -16,15 +16,21 @@
*/
package org.apache.kafka.connect.runtime;
import java.nio.ByteBuffer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
@ -39,6 +45,7 @@ import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
@ -152,6 +159,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask(TargetState initialState) {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
}
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
RetryWithToleranceOperatorTest.NOOP_OPERATOR);
@ -682,6 +693,80 @@ public class WorkerSourceTaskTest extends ThreadedTest {
assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
}
@Test
public void testHeaders() throws Exception {
Headers headers = new RecordHeaders();
headers.add("header_key", "header_value".getBytes());
org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
createWorkerTask();
List<SourceRecord> records = new ArrayList<>();
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(true, false, true, true, true, headers);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", records);
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(SERIALIZED_KEY, sent.getValue().key());
assertEquals(SERIALIZED_RECORD, sent.getValue().value());
assertEquals(headers, sent.getValue().headers());
PowerMock.verifyAll();
}
@Test
public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
TestConverterWithHeaders testConverter = new TestConverterWithHeaders();
createWorkerTask(TargetState.STARTED, stringConverter, testConverter, stringConverter);
List<SourceRecord> records = new ArrayList<>();
String stringA = "Árvíztűrő tükörfúrógép";
org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
String encodingA = "latin2";
headersA.addString("encoding", encodingA);
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
String stringB = "Тестовое сообщение";
org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
String encodingB = "koi8_r";
headersB.addString("encoding", encodingB);
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(false, false, true, true, false, null);
Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(false, false, true, true, false, null);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", records);
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
assertEquals(
ByteBuffer.wrap(stringA.getBytes(encodingA)),
ByteBuffer.wrap(sentRecordA.getValue().value())
);
assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
assertEquals(
ByteBuffer.wrap(stringB.getBytes(encodingB)),
ByteBuffer.wrap(sentRecordB.getValue().value())
);
assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
PowerMock.verifyAll();
}
private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(minimum);
// Note that we stub these to allow any number of calls because the thread will continue to
@ -708,7 +793,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@SuppressWarnings("unchecked")
private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
expectConvertKeyValue(false);
expectConvertHeadersAndKeyValue(false);
expectApplyTransformationChain(false);
offsetWriter.offset(PARTITION, OFFSET);
@ -729,24 +814,34 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
return expectSendRecord(false, false, false, false);
return expectSendRecord(false, false, false, false, true, emptyHeaders());
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true, true);
return expectSendRecord(anyTimes, isRetry, true, true, true, emptyHeaders());
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true, false);
return expectSendRecord(anyTimes, isRetry, true, false, true, emptyHeaders());
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, succeed, true, true, emptyHeaders());
}
@SuppressWarnings("unchecked")
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
boolean anyTimes,
boolean isRetry,
boolean sendSuccess,
boolean commitSuccess
boolean commitSuccess,
boolean isMockedConverters,
Headers headers
) throws InterruptedException {
expectConvertKeyValue(anyTimes);
if (isMockedConverters) {
expectConvertHeadersAndKeyValue(anyTimes, headers);
}
expectApplyTransformationChain(anyTimes);
Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
@ -794,13 +889,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return sent;
}
private void expectConvertKeyValue(boolean anyTimes) {
IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, KEY));
private void expectConvertHeadersAndKeyValue(boolean anyTimes) {
expectConvertHeadersAndKeyValue(anyTimes, emptyHeaders());
}
private void expectConvertHeadersAndKeyValue(boolean anyTimes, Headers headers) {
for (Header header : headers) {
IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(TOPIC, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
if (anyTimes)
convertHeaderExpect.andStubReturn(header.value());
else
convertHeaderExpect.andReturn(header.value());
}
IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(TOPIC, headers, KEY_SCHEMA, KEY));
if (anyTimes)
convertKeyExpect.andStubReturn(SERIALIZED_KEY);
else
convertKeyExpect.andReturn(SERIALIZED_KEY);
IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, RECORD));
IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(TOPIC, headers, RECORD_SCHEMA, RECORD));
if (anyTimes)
convertValueExpect.andStubReturn(SERIALIZED_RECORD);
else
@ -902,6 +1008,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
}
private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}
private abstract static class TestSourceTask extends SourceTask {
}