KAFKA-5142: Add Connect support for message headers (KIP-145)

**[KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect) has been accepted, and this PR implements KIP-145 except without the SMTs.**

Changed the Connect API and runtime to support message headers as described in [KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect).

The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.

The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.

A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringConverter`, `JsonConverter`, and `ByteArrayConverter` have all been extended to also be `HeaderConverter` implementations. Each connector can be configured with a different header converter, although by default the `SimpleHeaderConverter` is used to serialize header values as strings without schemas.

Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Arjun Satish <arjun@confluent.io>, Ted Yu <yuzhihong@gmail.com>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4319 from rhauch/kafka-5142-b
This commit is contained in:
Randall Hauch 2018-01-31 10:40:24 -08:00 committed by Ewen Cheslack-Postava
parent 530bc59de2
commit 4c48942f9d
43 changed files with 4481 additions and 160 deletions

View File

@ -277,6 +277,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.connect.data" />
<allow pkg="org.apache.kafka.connect.errors" />
<allow pkg="org.apache.kafka.connect.header" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.test"/>

View File

@ -57,7 +57,7 @@
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
@ -102,9 +102,13 @@
files="DistributedHerder.java"/>
<suppress checks="CyclomaticComplexity"
files="KafkaConfigBackingStore.java"/>
<suppress checks="CyclomaticComplexity"
files="(Values|ConnectHeader|ConnectHeaders).java"/>
<suppress checks="JavaNCSS"
files="KafkaConfigBackingStore.java"/>
<suppress checks="JavaNCSS"
files="Values.java"/>
<suppress checks="NPathComplexity"
files="ConnectRecord.java"/>
@ -116,6 +120,10 @@
files="JsonConverter.java"/>
<suppress checks="NPathComplexity"
files="DistributedHerder.java"/>
<suppress checks="NPathComplexity"
files="ConnectHeaders.java"/>
<suppress checks="MethodLength"
files="Values.java"/>
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"

View File

@ -17,6 +17,11 @@
package org.apache.kafka.connect.connector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import java.util.Objects;
/**
* <p>
@ -34,11 +39,19 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
private final Schema valueSchema;
private final Object value;
private final Long timestamp;
private final Headers headers;
public ConnectRecord(String topic, Integer kafkaPartition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp) {
this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, new ConnectHeaders());
}
public ConnectRecord(String topic, Integer kafkaPartition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
this.topic = topic;
this.kafkaPartition = kafkaPartition;
this.keySchema = keySchema;
@ -46,6 +59,11 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
this.valueSchema = valueSchema;
this.value = value;
this.timestamp = timestamp;
if (headers instanceof ConnectHeaders) {
this.headers = (ConnectHeaders) headers;
} else {
this.headers = new ConnectHeaders(headers);
}
}
public String topic() {
@ -76,9 +94,46 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
return timestamp;
}
/** Generate a new record of the same type as itself, with the specified parameter values. **/
/**
* Get the headers for this record.
*
* @return the headers; never null
*/
public Headers headers() {
return headers;
}
/**
* Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied
* over to the new record. Since the headers are mutable, the resulting record will have a copy of this record's headers.
*
* @param topic the name of the topic; may be null
* @param kafkaPartition the partition number for the Kafka topic; may be null
* @param keySchema the schema for the key; may be null
* @param key the key; may be null
* @param valueSchema the schema for the value; may be null
* @param value the value; may be null
* @param timestamp the timestamp; may be null
* @return the new record
*/
public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
/**
* Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied
* over to the new record.
*
* @param topic the name of the topic; may be null
* @param kafkaPartition the partition number for the Kafka topic; may be null
* @param keySchema the schema for the key; may be null
* @param key the key; may be null
* @param valueSchema the schema for the value; may be null
* @param value the value; may be null
* @param timestamp the timestamp; may be null
* @param headers the headers; may be null or empty
* @return the new record
*/
public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers);
@Override
public String toString() {
return "ConnectRecord{" +
@ -87,6 +142,7 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
", key=" + key +
", value=" + value +
", timestamp=" + timestamp +
", headers=" + headers +
'}';
}
@ -113,6 +169,8 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
return false;
if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
return false;
if (!Objects.equals(headers, that.headers))
return false;
return true;
}
@ -126,6 +184,7 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
result = 31 * result + headers.hashCode();
return result;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import java.util.Objects;
/**
* A {@link Header} implementation.
*/
class ConnectHeader implements Header {
private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
private final String key;
private final SchemaAndValue schemaAndValue;
protected ConnectHeader(String key, SchemaAndValue schemaAndValue) {
Objects.requireNonNull(key, "Null header keys are not permitted");
this.key = key;
this.schemaAndValue = schemaAndValue != null ? schemaAndValue : NULL_SCHEMA_AND_VALUE;
assert this.schemaAndValue != null;
}
@Override
public String key() {
return key;
}
@Override
public Object value() {
return schemaAndValue.value();
}
@Override
public Schema schema() {
Schema schema = schemaAndValue.schema();
if (schema == null && value() instanceof Struct) {
schema = ((Struct) value()).schema();
}
return schema;
}
@Override
public Header rename(String key) {
Objects.requireNonNull(key, "Null header keys are not permitted");
if (this.key.equals(key)) {
return this;
}
return new ConnectHeader(key, schemaAndValue);
}
@Override
public Header with(Schema schema, Object value) {
return new ConnectHeader(key, new SchemaAndValue(schema, value));
}
@Override
public int hashCode() {
return Objects.hash(key, schemaAndValue);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Header) {
Header that = (Header) obj;
return Objects.equals(this.key, that.key()) && Objects.equals(this.schema(), that.schema()) && Objects.equals(this.value(),
that.value());
}
return false;
}
@Override
public String toString() {
return "ConnectHeader(key=" + key + ", value=" + value() + ", schema=" + schema() + ")";
}
}

View File

@ -0,0 +1,519 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
/**
* A basic {@link Headers} implementation.
*/
public class ConnectHeaders implements Headers {
private static final int EMPTY_HASH = Objects.hash(new LinkedList<>());
/**
* An immutable and therefore sharable empty iterator.
*/
private static final Iterator<Header> EMPTY_ITERATOR = new Iterator<Header>() {
@Override
public boolean hasNext() {
return false;
}
@Override
public Header next() {
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new IllegalStateException();
}
};
// This field is set lazily, but once set to a list it is never set back to null
private LinkedList<Header> headers;
public ConnectHeaders() {
}
public ConnectHeaders(Iterable<Header> original) {
if (original == null) {
return;
}
if (original instanceof ConnectHeaders) {
ConnectHeaders originalHeaders = (ConnectHeaders) original;
if (!originalHeaders.isEmpty()) {
headers = new LinkedList<>(originalHeaders.headers);
}
} else {
headers = new LinkedList<>();
for (Header header : original) {
headers.add(header);
}
}
}
@Override
public int size() {
return headers == null ? 0 : headers.size();
}
@Override
public boolean isEmpty() {
return headers == null ? true : headers.isEmpty();
}
@Override
public Headers clear() {
if (headers != null) {
headers.clear();
}
return this;
}
@Override
public Headers add(Header header) {
Objects.requireNonNull(header, "Unable to add a null header.");
if (headers == null) {
headers = new LinkedList<>();
}
headers.add(header);
return this;
}
protected Headers addWithoutValidating(String key, Object value, Schema schema) {
return add(new ConnectHeader(key, new SchemaAndValue(schema, value)));
}
@Override
public Headers add(String key, SchemaAndValue schemaAndValue) {
checkSchemaMatches(schemaAndValue);
return add(new ConnectHeader(key, schemaAndValue != null ? schemaAndValue : SchemaAndValue.NULL));
}
@Override
public Headers add(String key, Object value, Schema schema) {
return add(key, value != null || schema != null ? new SchemaAndValue(schema, value) : SchemaAndValue.NULL);
}
@Override
public Headers addString(String key, String value) {
return addWithoutValidating(key, value, value != null ? Schema.STRING_SCHEMA : Schema.OPTIONAL_STRING_SCHEMA);
}
@Override
public Headers addBytes(String key, byte[] value) {
return addWithoutValidating(key, value, value != null ? Schema.BYTES_SCHEMA : Schema.OPTIONAL_BYTES_SCHEMA);
}
@Override
public Headers addBoolean(String key, boolean value) {
return addWithoutValidating(key, value, Schema.BOOLEAN_SCHEMA);
}
@Override
public Headers addByte(String key, byte value) {
return addWithoutValidating(key, value, Schema.INT8_SCHEMA);
}
@Override
public Headers addShort(String key, short value) {
return addWithoutValidating(key, value, Schema.INT16_SCHEMA);
}
@Override
public Headers addInt(String key, int value) {
return addWithoutValidating(key, value, Schema.INT32_SCHEMA);
}
@Override
public Headers addLong(String key, long value) {
return addWithoutValidating(key, value, Schema.INT64_SCHEMA);
}
@Override
public Headers addFloat(String key, float value) {
return addWithoutValidating(key, value, Schema.FLOAT32_SCHEMA);
}
@Override
public Headers addDouble(String key, double value) {
return addWithoutValidating(key, value, Schema.FLOAT64_SCHEMA);
}
@Override
public Headers addList(String key, List<?> value, Schema schema) {
if (value == null) {
return add(key, null, null);
}
checkSchemaType(schema, Type.ARRAY);
return addWithoutValidating(key, value, schema);
}
@Override
public Headers addMap(String key, Map<?, ?> value, Schema schema) {
if (value == null) {
return add(key, null, null);
}
checkSchemaType(schema, Type.MAP);
return addWithoutValidating(key, value, schema);
}
@Override
public Headers addStruct(String key, Struct value) {
if (value == null) {
return add(key, null, null);
}
checkSchemaType(value.schema(), Type.STRUCT);
return addWithoutValidating(key, value, value.schema());
}
@Override
public Headers addDecimal(String key, BigDecimal value) {
if (value == null) {
return add(key, null, null);
}
// Check that this is a decimal ...
Schema schema = Decimal.schema(value.scale());
Decimal.fromLogical(schema, value);
return addWithoutValidating(key, value, schema);
}
@Override
public Headers addDate(String key, java.util.Date value) {
if (value != null) {
// Check that this is a date ...
Date.fromLogical(Date.SCHEMA, value);
}
return addWithoutValidating(key, value, Date.SCHEMA);
}
@Override
public Headers addTime(String key, java.util.Date value) {
if (value != null) {
// Check that this is a time ...
Time.fromLogical(Time.SCHEMA, value);
}
return addWithoutValidating(key, value, Time.SCHEMA);
}
@Override
public Headers addTimestamp(String key, java.util.Date value) {
if (value != null) {
// Check that this is a timestamp ...
Timestamp.fromLogical(Timestamp.SCHEMA, value);
}
return addWithoutValidating(key, value, Timestamp.SCHEMA);
}
@Override
public Header lastWithName(String key) {
checkKey(key);
if (headers != null) {
ListIterator<Header> iter = headers.listIterator(headers.size());
while (iter.hasPrevious()) {
Header header = iter.previous();
if (key.equals(header.key())) {
return header;
}
}
}
return null;
}
@Override
public Iterator<Header> allWithName(String key) {
return new FilterByKeyIterator(iterator(), key);
}
@Override
public Iterator<Header> iterator() {
if (headers != null) {
return headers.iterator();
}
return EMPTY_ITERATOR;
}
@Override
public Headers remove(String key) {
checkKey(key);
if (!headers.isEmpty()) {
Iterator<Header> iterator = iterator();
while (iterator.hasNext()) {
if (iterator.next().key().equals(key)) {
iterator.remove();
}
}
}
return this;
}
@Override
public Headers retainLatest() {
if (!headers.isEmpty()) {
Set<String> keys = new HashSet<>();
ListIterator<Header> iter = headers.listIterator(headers.size());
while (iter.hasPrevious()) {
Header header = iter.previous();
String key = header.key();
if (!keys.add(key)) {
iter.remove();
}
}
}
return this;
}
@Override
public Headers retainLatest(String key) {
checkKey(key);
if (!headers.isEmpty()) {
boolean found = false;
ListIterator<Header> iter = headers.listIterator(headers.size());
while (iter.hasPrevious()) {
String headerKey = iter.previous().key();
if (key.equals(headerKey)) {
if (found)
iter.remove();
found = true;
}
}
}
return this;
}
@Override
public Headers apply(String key, HeaderTransform transform) {
checkKey(key);
if (!headers.isEmpty()) {
ListIterator<Header> iter = headers.listIterator();
while (iter.hasNext()) {
Header orig = iter.next();
if (orig.key().equals(key)) {
Header updated = transform.apply(orig);
if (updated != null) {
iter.set(updated);
} else {
iter.remove();
}
}
}
}
return this;
}
@Override
public Headers apply(HeaderTransform transform) {
if (!headers.isEmpty()) {
ListIterator<Header> iter = headers.listIterator();
while (iter.hasNext()) {
Header orig = iter.next();
Header updated = transform.apply(orig);
if (updated != null) {
iter.set(updated);
} else {
iter.remove();
}
}
}
return this;
}
@Override
public int hashCode() {
return isEmpty() ? EMPTY_HASH : Objects.hash(headers);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Headers) {
Headers that = (Headers) obj;
Iterator<Header> thisIter = this.iterator();
Iterator<Header> thatIter = that.iterator();
while (thisIter.hasNext() && thatIter.hasNext()) {
if (!Objects.equals(thisIter.next(), thatIter.next()))
return false;
}
return !thisIter.hasNext() && !thatIter.hasNext();
}
return false;
}
@Override
public String toString() {
return "ConnectHeaders(headers=" + (headers != null ? headers : "") + ")";
}
@Override
public ConnectHeaders duplicate() {
return new ConnectHeaders(this);
}
/**
* Check that the key is not null
*
* @param key the key; may not be null
* @throws NullPointerException if the supplied key is null
*/
private void checkKey(String key) {
Objects.requireNonNull(key, "Header key cannot be null");
}
/**
* Check the {@link Schema#type() schema's type} matches the specified type.
*
* @param schema the schema; never null
* @param type the expected type
* @throws DataException if the schema's type does not match the expected type
*/
private void checkSchemaType(Schema schema, Type type) {
if (schema.type() != type) {
throw new DataException("Expecting " + type + " but instead found " + schema.type());
}
}
/**
* Check that the value and its schema are compatible.
*
* @param schemaAndValue the schema and value pair
* @throws DataException if the schema is not compatible with the value
*/
// visible for testing
void checkSchemaMatches(SchemaAndValue schemaAndValue) {
if (schemaAndValue != null) {
Schema schema = schemaAndValue.schema();
if (schema == null)
return;
schema = schema.schema(); // in case a SchemaBuilder is used
Object value = schemaAndValue.value();
if (value == null && !schema.isOptional()) {
throw new DataException("A null value requires an optional schema but was " + schema);
}
if (value != null) {
switch (schema.type()) {
case BYTES:
if (value instanceof ByteBuffer)
return;
if (value instanceof byte[])
return;
if (value instanceof BigDecimal && Decimal.LOGICAL_NAME.equals(schema.name()))
return;
break;
case STRING:
if (value instanceof String)
return;
break;
case BOOLEAN:
if (value instanceof Boolean)
return;
break;
case INT8:
if (value instanceof Byte)
return;
break;
case INT16:
if (value instanceof Short)
return;
break;
case INT32:
if (value instanceof Integer)
return;
if (value instanceof java.util.Date && Date.LOGICAL_NAME.equals(schema.name()))
return;
if (value instanceof java.util.Date && Time.LOGICAL_NAME.equals(schema.name()))
return;
break;
case INT64:
if (value instanceof Long)
return;
if (value instanceof java.util.Date && Timestamp.LOGICAL_NAME.equals(schema.name()))
return;
break;
case FLOAT32:
if (value instanceof Float)
return;
break;
case FLOAT64:
if (value instanceof Double)
return;
break;
case ARRAY:
if (value instanceof List)
return;
break;
case MAP:
if (value instanceof Map)
return;
break;
case STRUCT:
if (value instanceof Struct)
return;
break;
}
throw new DataException("The value " + value + " is not compatible with the schema " + schema);
}
}
}
private static final class FilterByKeyIterator extends AbstractIterator<Header> {
private final Iterator<Header> original;
private final String key;
private FilterByKeyIterator(Iterator<Header> original, String key) {
this.original = original;
this.key = key;
}
protected Header makeNext() {
while (original.hasNext()) {
Header header = original.next();
if (!header.key().equals(key)) {
continue;
}
return header;
}
return this.allDone();
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.connect.data.Schema;
/**
* A {@link Header} is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message.
* If the value contains schema information, then the header will have a non-null {@link #schema() schema}.
* <p>
* This is an immutable interface.
*/
public interface Header {
/**
* The header's key, which is not necessarily unique within the set of headers on a Kafka message.
*
* @return the header's key; never null
*/
String key();
/**
* Return the {@link Schema} associated with this header, if there is one. Not all headers will have schemas.
*
* @return the header's schema, or null if no schema is associated with this header
*/
Schema schema();
/**
* Get the header's value as deserialized by Connect's header converter.
*
* @return the deserialized object representation of the header's value; may be null
*/
Object value();
/**
* Return a new {@link Header} object that has the same key but with the supplied value.
*
* @param schema the schema for the new value; may be null
* @param value the new value
* @return the new {@link Header}; never null
*/
Header with(Schema schema, Object value);
/**
* Return a new {@link Header} object that has the same schema and value but with the supplied key.
*
* @param key the key for the new header; may not be null
* @return the new {@link Header}; never null
*/
Header rename(String key);
}

View File

@ -0,0 +1,308 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import java.math.BigDecimal;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}.
*/
public interface Headers extends Iterable<Header> {
/**
* Get the number of headers in this object.
*
* @return the number of headers; never negative
*/
int size();
/**
* Determine whether this object has no headers.
*
* @return true if there are no headers, or false if there is at least one header
*/
boolean isEmpty();
/**
* Get the collection of {@link Header} objects whose {@link Header#key() keys} all match the specified key.
*
* @param key the key; may not be null
* @return the iterator over headers with the specified key; may be null if there are no headers with the specified key
*/
Iterator<Header> allWithName(String key);
/**
* Return the last {@link Header} with the specified key.
*
* @param key the key for the header; may not be null
* @return the last Header, or null if there are no headers with the specified key
*/
Header lastWithName(String key);
/**
* Add the given {@link Header} to this collection.
*
* @param header the header; may not be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers add(Header header);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param schemaAndValue the {@link SchemaAndValue} for the header; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers add(String key, SchemaAndValue schemaAndValue);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @param schema the schema for the header's value; may not be null if the value is not null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers add(String key, Object value, Schema schema);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addString(String key, String value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addBoolean(String key, boolean value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addByte(String key, byte value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addShort(String key, short value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addInt(String key, int value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addLong(String key, long value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addFloat(String key, float value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addDouble(String key, double value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addBytes(String key, byte[] value);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @param schema the schema describing the list value; may not be null
* @return this object to facilitate chaining multiple methods; never null
* @throws DataException if the header's value is invalid
*/
Headers addList(String key, List<?> value, Schema schema);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @param schema the schema describing the map value; may not be null
* @return this object to facilitate chaining multiple methods; never null
* @throws DataException if the header's value is invalid
*/
Headers addMap(String key, Map<?, ?> value, Schema schema);
/**
* Add to this collection a {@link Header} with the given key and value.
*
* @param key the header's key; may not be null
* @param value the header's value; may be null
* @return this object to facilitate chaining multiple methods; never null
* @throws DataException if the header's value is invalid
*/
Headers addStruct(String key, Struct value);
/**
* Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Decimal} value.
*
* @param key the header's key; may not be null
* @param value the header's {@link org.apache.kafka.connect.data.Decimal} value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addDecimal(String key, BigDecimal value);
/**
* Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Date} value.
*
* @param key the header's key; may not be null
* @param value the header's {@link org.apache.kafka.connect.data.Date} value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addDate(String key, java.util.Date value);
/**
* Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Time} value.
*
* @param key the header's key; may not be null
* @param value the header's {@link org.apache.kafka.connect.data.Time} value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addTime(String key, java.util.Date value);
/**
* Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Timestamp} value.
*
* @param key the header's key; may not be null
* @param value the header's {@link org.apache.kafka.connect.data.Timestamp} value; may be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers addTimestamp(String key, java.util.Date value);
/**
* Removes all {@link Header} objects whose {@link Header#key() key} matches the specified key.
*
* @param key the key; may not be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers remove(String key);
/**
* Removes all but the latest {@link Header} objects whose {@link Header#key() key} matches the specified key.
*
* @param key the key; may not be null
* @return this object to facilitate chaining multiple methods; never null
*/
Headers retainLatest(String key);
/**
* Removes all but the last {@Header} object with each key.
*
* @return this object to facilitate chaining multiple methods; never null
*/
Headers retainLatest();
/**
* Removes all headers from this object.
*
* @return this object to facilitate chaining multiple methods; never null
*/
Headers clear();
/**
* Create a copy of this {@link Headers} object. The new copy will contain all of the same {@link Header} objects as this object.
* @return the copy; never null
*/
Headers duplicate();
/**
* Get all {@link Header}s, apply the transform to each and store the result in place of the original.
*
* @param transform the transform to apply; may not be null
* @return this object to facilitate chaining multiple methods; never null
* @throws DataException if the header's value is invalid
*/
Headers apply(HeaderTransform transform);
/**
* Get all {@link Header}s with the given key, apply the transform to each and store the result in place of the original.
*
* @param key the header's key; may not be null
* @param transform the transform to apply; may not be null
* @return this object to facilitate chaining multiple methods; never null
* @throws DataException if the header's value is invalid
*/
Headers apply(String key, HeaderTransform transform);
/**
* A function to transform the supplied {@link Header}. Implementations will likely need to use {@link Header#with(Schema, Object)}
* to create the new instance.
*/
interface HeaderTransform {
/**
* Transform the given {@link Header} and return the updated {@link Header}.
*
* @param header the input header; never null
* @return the new header, or null if the supplied {@link Header} is to be removed
*/
Header apply(Header header);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.sink;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
/**
* SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of
@ -38,7 +39,12 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
Long timestamp, TimestampType timestampType) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp);
this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, null);
}
public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
Long timestamp, TimestampType timestampType, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.kafkaOffset = kafkaOffset;
this.timestampType = timestampType;
}
@ -53,7 +59,13 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType);
return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers().duplicate());
}
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.source;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import java.util.Map;
@ -69,7 +70,15 @@ public class SourceRecord extends ConnectRecord<SourceRecord> {
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp);
this(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, null);
}
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
}
@ -84,7 +93,13 @@ public class SourceRecord extends ConnectRecord<SourceRecord> {
@Override
public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers().duplicate());
}
@Override
public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
}
@Override

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Abstract class that defines the configuration options for {@link Converter} and {@link HeaderConverter} instances.
*/
public abstract class ConverterConfig extends AbstractConfig {
public static final String TYPE_CONFIG = "converter.type";
private static final String TYPE_DOC = "How this converter will be used.";
/**
* Create a new {@link ConfigDef} instance containing the configurations defined by ConverterConfig. This can be called by subclasses.
*
* @return the ConfigDef; never null
*/
public static ConfigDef newConfigDef() {
return new ConfigDef().define(TYPE_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
in(ConverterType.KEY.getName(), ConverterType.VALUE.getName(), ConverterType.HEADER.getName()),
Importance.LOW, TYPE_DOC);
}
protected ConverterConfig(ConfigDef configDef, Map<String, ?> props) {
super(configDef, props, true);
}
/**
* Get the type of converter as defined by the {@link #TYPE_CONFIG} configuration.
* @return the converter type; never null
*/
public ConverterType type() {
return ConverterType.withName(getString(TYPE_CONFIG));
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* The type of {@link Converter} and {@link HeaderConverter}.
*/
public enum ConverterType {
KEY,
VALUE,
HEADER;
private static final Map<String, ConverterType> NAME_TO_TYPE;
static {
ConverterType[] types = ConverterType.values();
Map<String, ConverterType> nameToType = new HashMap<>(types.length);
for (ConverterType type : types) {
nameToType.put(type.name, type);
}
NAME_TO_TYPE = Collections.unmodifiableMap(nameToType);
}
/**
* Find the ConverterType with the given name, using a case-insensitive match.
* @param name the name of the converter type; may be null
* @return the matching converter type, or null if the supplied name is null or does not match the name of the known types
*/
public static ConverterType withName(String name) {
if (name == null) {
return null;
}
return NAME_TO_TYPE.get(name.toLowerCase(Locale.getDefault()));
}
private String name;
ConverterType() {
this.name = this.name().toLowerCase(Locale.ROOT);
}
public String getName() {
return name;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Header;
import java.io.Closeable;
public interface HeaderConverter extends Configurable, Closeable {
/**
* Convert the header name and byte array value into a {@link Header} object.
* @param topic the name of the topic for the record containing the header
* @param headerKey the header's key; may not be null
* @param value the header's raw value; may be null
* @return the {@link SchemaAndValue}; may not be null
*/
SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value);
/**
* Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
* @param topic the name of the topic for the record containing the header
* @param headerKey the header's key; may not be null
* @param schema the schema for the header's value; may be null
* @param value the header's value to convert; may be null
* @return the byte array form of the Header's value; may be null if the value is null
*/
byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value);
/**
* Configuration specification for this set of header converters.
* @return the configuration specification; may not be null
*/
ConfigDef config();
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.NoSuchElementException;
/**
* A {@link HeaderConverter} that serializes header values as strings and that deserializes header values to the most appropriate
* numeric, boolean, array, or map representation. Schemas are not serialized, but are inferred upon deserialization when possible.
*/
public class SimpleHeaderConverter implements HeaderConverter {
private static final Logger LOG = LoggerFactory.getLogger(SimpleHeaderConverter.class);
private static final ConfigDef CONFIG_DEF = new ConfigDef();
private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
private static final Charset UTF_8 = StandardCharsets.UTF_8;
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void configure(Map<String, ?> configs) {
// do nothing
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
if (value == null) {
return NULL_SCHEMA_AND_VALUE;
}
try {
String str = new String(value, UTF_8);
if (str.isEmpty()) {
return new SchemaAndValue(Schema.STRING_SCHEMA, str);
}
return Values.parseString(str);
} catch (NoSuchElementException e) {
throw new DataException("Failed to deserialize value for header '" + headerKey + "' on topic '" + topic + "'", e);
} catch (Throwable t) {
LOG.warn("Failed to deserialize value for header '{}' on topic '{}', so using byte array", headerKey, topic, t);
return new SchemaAndValue(Schema.BYTES_SCHEMA, value);
}
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
if (value == null) {
return null;
}
return Values.convertToString(schema, value).getBytes(UTF_8);
}
@Override
public void close() throws IOException {
// do nothing
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@ -27,16 +28,19 @@ import java.util.HashMap;
import java.util.Map;
/**
* {@link Converter} implementation that only supports serializing to strings. When converting Kafka Connect data to bytes,
* the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
* {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to strings. When converting Kafka Connect
* data to bytes, the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
* When converting from bytes to Kafka Connect format, the converter will only ever return an optional string schema and
* a string or null.
*
* Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
* this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
* setting.
* this class can also be configured to use the same encoding for both encoding and decoding with the
* {@link StringConverterConfig#ENCODING_CONFIG converter.encoding} setting.
*
* This implementation currently does nothing with the topic names or header names.
*/
public class StringConverter implements Converter {
public class StringConverter implements Converter, HeaderConverter {
private final StringSerializer serializer = new StringSerializer();
private final StringDeserializer deserializer = new StringDeserializer();
@ -44,22 +48,32 @@ public class StringConverter implements Converter {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.putAll(configs);
Map<String, Object> deserializerConfigs = new HashMap<>();
deserializerConfigs.putAll(configs);
public ConfigDef config() {
return StringConverterConfig.configDef();
}
Object encodingValue = configs.get("converter.encoding");
if (encodingValue != null) {
serializerConfigs.put("serializer.encoding", encodingValue);
deserializerConfigs.put("deserializer.encoding", encodingValue);
}
@Override
public void configure(Map<String, ?> configs) {
StringConverterConfig conf = new StringConverterConfig(configs);
String encoding = conf.encoding();
Map<String, Object> serializerConfigs = new HashMap<>(configs);
Map<String, Object> deserializerConfigs = new HashMap<>(configs);
serializerConfigs.put("serializer.encoding", encoding);
deserializerConfigs.put("deserializer.encoding", encoding);
boolean isKey = conf.type() == ConverterType.KEY;
serializer.configure(serializerConfigs, isKey);
deserializer.configure(deserializerConfigs, isKey);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Map<String, Object> conf = new HashMap<>(configs);
conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName());
configure(conf);
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
try {
@ -77,4 +91,19 @@ public class StringConverter implements Converter {
throw new DataException("Failed to deserialize string: ", e);
}
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return toConnectData(topic, value);
}
@Override
public void close() {
// do nothing
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import java.util.Map;
/**
* Configuration options for {@link StringConverter} instances.
*/
public class StringConverterConfig extends ConverterConfig {
public static final String ENCODING_CONFIG = "converter.encoding";
public static final String ENCODING_DEFAULT = "UTF8";
private static final String ENCODING_DOC = "The name of the Java character set to use for encoding strings as byte arrays.";
private static final String ENCODING_DISPLAY = "Encoding";
private final static ConfigDef CONFIG;
static {
CONFIG = ConverterConfig.newConfigDef();
CONFIG.define(ENCODING_CONFIG, Type.STRING, ENCODING_DEFAULT, Importance.HIGH, ENCODING_DOC, null, -1, Width.MEDIUM,
ENCODING_DISPLAY);
}
public static ConfigDef configDef() {
return CONFIG;
}
public StringConverterConfig(Map<String, ?> props) {
super(CONFIG, props);
}
/**
* Get the string encoding.
*
* @return the encoding; never null
*/
public String encoding() {
return getString(ENCODING_CONFIG);
}
}

View File

@ -0,0 +1,350 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.data.Values.Parser;
import org.junit.Test;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ValuesTest {
private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
private static final Schema STRING_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).schema();
private static final Map<String, Short> STRING_SHORT_MAP = new LinkedHashMap<>();
private static final Schema STRING_SHORT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT16_SCHEMA).schema();
private static final Map<String, Integer> STRING_INT_MAP = new LinkedHashMap<>();
private static final Schema STRING_INT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).schema();
private static final List<Integer> INT_LIST = new ArrayList<>();
private static final Schema INT_LIST_SCHEMA = SchemaBuilder.array(Schema.INT32_SCHEMA).schema();
private static final List<String> STRING_LIST = new ArrayList<>();
private static final Schema STRING_LIST_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).schema();
static {
STRING_MAP.put("foo", "123");
STRING_MAP.put("bar", "baz");
STRING_SHORT_MAP.put("foo", (short) 12345);
STRING_SHORT_MAP.put("bar", (short) 0);
STRING_SHORT_MAP.put("baz", (short) -4321);
STRING_INT_MAP.put("foo", 1234567890);
STRING_INT_MAP.put("bar", 0);
STRING_INT_MAP.put("baz", -987654321);
STRING_LIST.add("foo");
STRING_LIST.add("bar");
INT_LIST.add(1234567890);
INT_LIST.add(-987654321);
}
@Test
public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() {
String original = "three\"blind\\\"mice";
String expected = "three\\\"blind\\\\\\\"mice";
assertEquals(expected, Values.escape(original));
}
@Test
public void shouldConvertNullValue() {
assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, null);
assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, Schema.STRING_SCHEMA, null);
}
@Test
public void shouldConvertSimpleString() {
assertRoundTrip(Schema.STRING_SCHEMA, "simple");
}
@Test
public void shouldConvertEmptyString() {
assertRoundTrip(Schema.STRING_SCHEMA, "");
}
@Test
public void shouldConvertStringWithQuotesAndOtherDelimiterCharacters() {
assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, "three\"blind\\\"mice");
assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, "string with delimiters: <>?,./\\=+-!@#$%^&*(){}[]|;':");
}
@Test
public void shouldConvertMapWithStringKeys() {
assertRoundTrip(STRING_MAP_SCHEMA, STRING_MAP_SCHEMA, STRING_MAP);
}
@Test
public void shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_MAP_SCHEMA, "{\"foo\":\"123\",\"bar\":\"baz\"}");
assertEquals(STRING_MAP_SCHEMA, result.schema());
assertEquals(STRING_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_MAP_SCHEMA, "{ \"foo\" : \"123\", \n\"bar\" : \"baz\" } ");
assertEquals(STRING_MAP_SCHEMA, result.schema());
assertEquals(STRING_MAP, result.value());
}
@Test
public void shouldConvertMapWithStringKeysAndShortValues() {
assertRoundTrip(STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP);
}
@Test
public void shouldParseStringOfMapWithShortValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_SHORT_MAP_SCHEMA, "{\"foo\":12345,\"bar\":0,\"baz\":-4321}");
assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
assertEquals(STRING_SHORT_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithShortValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_SHORT_MAP_SCHEMA, " { \"foo\" : 12345 , \"bar\" : 0, \"baz\" : -4321 } ");
assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
assertEquals(STRING_SHORT_MAP, result.value());
}
@Test
public void shouldConvertMapWithStringKeysAndIntegerValues() {
assertRoundTrip(STRING_INT_MAP_SCHEMA, STRING_INT_MAP_SCHEMA, STRING_INT_MAP);
}
@Test
public void shouldParseStringOfMapWithIntValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_INT_MAP_SCHEMA, "{\"foo\":1234567890,\"bar\":0,\"baz\":-987654321}");
assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
assertEquals(STRING_INT_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithIntValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(STRING_INT_MAP_SCHEMA, " { \"foo\" : 1234567890 , \"bar\" : 0, \"baz\" : -987654321 } ");
assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
assertEquals(STRING_INT_MAP, result.value());
}
@Test
public void shouldConvertListWithStringValues() {
assertRoundTrip(STRING_LIST_SCHEMA, STRING_LIST_SCHEMA, STRING_LIST);
}
@Test
public void shouldConvertListWithIntegerValues() {
assertRoundTrip(INT_LIST_SCHEMA, INT_LIST_SCHEMA, INT_LIST);
}
@Test
public void shouldParseStringsWithoutDelimiters() {
//assertParsed("");
assertParsed(" ");
assertParsed("simple");
assertParsed("simple string");
assertParsed("simple \n\t\bstring");
assertParsed("'simple' string");
assertParsed("si\\mple");
assertParsed("si\\\\mple");
}
@Test
public void shouldParseStringsWithEscapedDelimiters() {
assertParsed("si\\\"mple");
assertParsed("si\\{mple");
assertParsed("si\\}mple");
assertParsed("si\\]mple");
assertParsed("si\\[mple");
assertParsed("si\\:mple");
assertParsed("si\\,mple");
}
@Test
public void shouldParseStringsWithSingleDelimiter() {
assertParsed("a{b", "a", "{", "b");
assertParsed("a}b", "a", "}", "b");
assertParsed("a[b", "a", "[", "b");
assertParsed("a]b", "a", "]", "b");
assertParsed("a:b", "a", ":", "b");
assertParsed("a,b", "a", ",", "b");
assertParsed("a\"b", "a", "\"", "b");
assertParsed("{b", "{", "b");
assertParsed("}b", "}", "b");
assertParsed("[b", "[", "b");
assertParsed("]b", "]", "b");
assertParsed(":b", ":", "b");
assertParsed(",b", ",", "b");
assertParsed("\"b", "\"", "b");
assertParsed("{", "{");
assertParsed("}", "}");
assertParsed("[", "[");
assertParsed("]", "]");
assertParsed(":", ":");
assertParsed(",", ",");
assertParsed("\"", "\"");
}
@Test
public void shouldParseStringsWithMultipleDelimiters() {
assertParsed("\"simple\" string", "\"", "simple", "\"", " string");
assertParsed("a{bc}d", "a", "{", "bc", "}", "d");
assertParsed("a { b c } d", "a ", "{", " b c ", "}", " d");
assertParsed("a { b c } d", "a ", "{", " b c ", "}", " d");
}
@Test
public void canConsume() {
}
protected void assertParsed(String input) {
assertParsed(input, input);
}
protected void assertParsed(String input, String... expectedTokens) {
Parser parser = new Parser(input);
if (!parser.hasNext()) {
assertEquals(1, expectedTokens.length);
assertTrue(expectedTokens[0].isEmpty());
return;
}
for (String expectedToken : expectedTokens) {
assertTrue(parser.hasNext());
int position = parser.mark();
assertEquals(expectedToken, parser.next());
assertEquals(position + expectedToken.length(), parser.position());
assertEquals(expectedToken, parser.previous());
parser.rewindTo(position);
assertEquals(position, parser.position());
assertEquals(expectedToken, parser.next());
int newPosition = parser.mark();
assertEquals(position + expectedToken.length(), newPosition);
assertEquals(expectedToken, parser.previous());
}
assertFalse(parser.hasNext());
// Rewind and try consuming expected tokens ...
parser.rewindTo(0);
assertConsumable(parser, expectedTokens);
// Parse again and try consuming expected tokens ...
parser = new Parser(input);
assertConsumable(parser, expectedTokens);
}
protected void assertConsumable(Parser parser, String ... expectedTokens) {
for (String expectedToken : expectedTokens) {
if (!expectedToken.trim().isEmpty()) {
int position = parser.mark();
assertTrue(parser.canConsume(expectedToken.trim()));
parser.rewindTo(position);
assertTrue(parser.canConsume(expectedToken.trim(), true));
parser.rewindTo(position);
assertTrue(parser.canConsume(expectedToken, false));
}
}
}
protected SchemaAndValue roundTrip(Schema desiredSchema, String currentValue) {
return roundTrip(desiredSchema, new SchemaAndValue(Schema.STRING_SCHEMA, currentValue));
}
protected SchemaAndValue roundTrip(Schema desiredSchema, SchemaAndValue input) {
String serialized = Values.convertToString(input.schema(), input.value());
if (input != null && input.value() != null) {
assertNotNull(serialized);
}
if (desiredSchema == null) {
desiredSchema = Values.inferSchema(input);
assertNotNull(desiredSchema);
}
Object newValue = null;
Schema newSchema = null;
switch (desiredSchema.type()) {
case STRING:
newValue = Values.convertToString(Schema.STRING_SCHEMA, serialized);
break;
case INT8:
newValue = Values.convertToByte(Schema.STRING_SCHEMA, serialized);
break;
case INT16:
newValue = Values.convertToShort(Schema.STRING_SCHEMA, serialized);
break;
case INT32:
newValue = Values.convertToInteger(Schema.STRING_SCHEMA, serialized);
break;
case INT64:
newValue = Values.convertToLong(Schema.STRING_SCHEMA, serialized);
break;
case FLOAT32:
newValue = Values.convertToFloat(Schema.STRING_SCHEMA, serialized);
break;
case FLOAT64:
newValue = Values.convertToDouble(Schema.STRING_SCHEMA, serialized);
break;
case BOOLEAN:
newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, serialized);
break;
case ARRAY:
newValue = Values.convertToList(Schema.STRING_SCHEMA, serialized);
break;
case MAP:
newValue = Values.convertToMap(Schema.STRING_SCHEMA, serialized);
break;
case STRUCT:
newValue = Values.convertToStruct(Schema.STRING_SCHEMA, serialized);
break;
case BYTES:
fail("unexpected schema type");
break;
}
newSchema = Values.inferSchema(newValue);
return new SchemaAndValue(newSchema, newValue);
}
protected void assertRoundTrip(Schema schema, String value) {
assertRoundTrip(schema, Schema.STRING_SCHEMA, value);
}
protected void assertRoundTrip(Schema schema, Schema currentSchema, Object value) {
SchemaAndValue result = roundTrip(schema, new SchemaAndValue(currentSchema, value));
if (value == null) {
assertNull(result.schema());
assertNull(result.value());
} else {
assertEquals(value, result.value());
assertEquals(schema, result.schema());
SchemaAndValue result2 = roundTrip(result.schema(), result);
assertEquals(schema, result2.schema());
assertEquals(value, result2.value());
assertEquals(result, result2);
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
public class ConnectHeaderTest {
private String key;
private ConnectHeader header;
@Before
public void beforeEach() {
key = "key";
withString("value");
}
protected Header withValue(Schema schema, Object value) {
header = new ConnectHeader(key, new SchemaAndValue(schema, value));
return header;
}
protected Header withString(String value) {
return withValue(Schema.STRING_SCHEMA, value);
}
@Test
public void shouldAllowNullValues() {
withValue(Schema.OPTIONAL_STRING_SCHEMA, null);
}
@Test
public void shouldAllowNullSchema() {
withValue(null, null);
assertNull(header.schema());
assertNull(header.value());
String value = "non-null value";
withValue(null, value);
assertNull(header.schema());
assertSame(value, header.value());
}
@Test
public void shouldAllowNonNullValue() {
String value = "non-null value";
withValue(Schema.STRING_SCHEMA, value);
assertSame(Schema.STRING_SCHEMA, header.schema());
assertEquals(value, header.value());
withValue(Schema.BOOLEAN_SCHEMA, true);
assertSame(Schema.BOOLEAN_SCHEMA, header.schema());
assertEquals(true, header.value());
}
@Test
public void shouldGetSchemaFromStruct() {
Schema schema = SchemaBuilder.struct()
.field("foo", Schema.STRING_SCHEMA)
.field("bar", Schema.INT32_SCHEMA)
.build();
Struct value = new Struct(schema);
value.put("foo", "value");
value.put("bar", 100);
withValue(null, value);
assertSame(schema, header.schema());
assertSame(value, header.value());
}
@Test
public void shouldSatisfyEquals() {
String value = "non-null value";
Header h1 = withValue(Schema.STRING_SCHEMA, value);
assertSame(Schema.STRING_SCHEMA, header.schema());
assertEquals(value, header.value());
Header h2 = withValue(Schema.STRING_SCHEMA, value);
assertEquals(h1, h2);
assertEquals(h1.hashCode(), h2.hashCode());
Header h3 = withValue(Schema.INT8_SCHEMA, 100);
assertNotEquals(h3, h2);
}
}

View File

@ -0,0 +1,547 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.header;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Headers.HeaderTransform;
import org.junit.Before;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConnectHeadersTest {
private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
static {
EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
}
private ConnectHeaders headers;
private Iterator<Header> iter;
private String key;
private String other;
@Before
public void beforeEach() {
headers = new ConnectHeaders();
key = "k1";
other = "other key";
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullKey() {
headers.add(null, "value", Schema.STRING_SCHEMA);
}
protected void populate(Headers headers) {
headers.addBoolean(key, true);
headers.addInt(key, 0);
headers.addString(other, "other value");
headers.addString(key, null);
headers.addString(key, "third");
}
@Test
public void shouldBeEquals() {
Headers other = new ConnectHeaders();
assertEquals(headers, other);
assertEquals(headers.hashCode(), other.hashCode());
populate(headers);
assertNotEquals(headers, other);
assertNotEquals(headers.hashCode(), other.hashCode());
populate(other);
assertEquals(headers, other);
assertEquals(headers.hashCode(), other.hashCode());
headers.addString("wow", "some value");
assertNotEquals(headers, other);
}
@Test
public void shouldHaveToString() {
// empty
assertNotNull(headers.toString());
// not empty
populate(headers);
assertNotNull(headers.toString());
}
@Test
public void shouldAddMultipleHeadersWithSameKeyAndRetainLatest() {
populate(headers);
Header header = headers.lastWithName(key);
assertHeader(header, key, Schema.STRING_SCHEMA, "third");
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
headers.retainLatest(other);
assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
headers.retainLatest(key);
assertOnlySingleHeader(key, Schema.STRING_SCHEMA, "third");
headers.retainLatest();
assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
assertOnlySingleHeader(key, Schema.STRING_SCHEMA, "third");
}
@Test
public void shouldAddHeadersWithPrimitiveValues() {
String key = "k1";
headers.addBoolean(key, true);
headers.addByte(key, (byte) 0);
headers.addShort(key, (short) 0);
headers.addInt(key, 0);
headers.addLong(key, 0);
headers.addFloat(key, 1.0f);
headers.addDouble(key, 1.0d);
headers.addString(key, null);
headers.addString(key, "third");
}
@Test
public void shouldAddHeadersWithNullObjectValuesWithOptionalSchema() {
addHeader("k1", Schema.BOOLEAN_SCHEMA, true);
addHeader("k2", Schema.STRING_SCHEMA, "hello");
addHeader("k3", Schema.OPTIONAL_STRING_SCHEMA, null);
}
@Test
public void shouldNotAddHeadersWithNullObjectValuesWithNonOptionalSchema() {
attemptAndFailToAddHeader("k1", Schema.BOOLEAN_SCHEMA, null);
attemptAndFailToAddHeader("k2", Schema.STRING_SCHEMA, null);
}
@Test
public void shouldNotAddHeadersWithObjectValuesAndMismatchedSchema() {
attemptAndFailToAddHeader("k1", Schema.BOOLEAN_SCHEMA, "wrong");
attemptAndFailToAddHeader("k2", Schema.OPTIONAL_STRING_SCHEMA, 0L);
}
@Test
public void shouldRemoveAllHeadersWithSameKey() {
populate(headers);
iter = headers.allWithName(key);
assertContainsHeader(key, Schema.BOOLEAN_SCHEMA, true);
assertContainsHeader(key, Schema.INT32_SCHEMA, 0);
assertContainsHeader(key, Schema.STRING_SCHEMA, "third");
assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
headers.remove(key);
assertNoHeaderWithKey(key);
assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
}
@Test
public void shouldRemoveAllHeaders() {
populate(headers);
iter = headers.allWithName(key);
assertContainsHeader(key, Schema.BOOLEAN_SCHEMA, true);
assertContainsHeader(key, Schema.INT32_SCHEMA, 0);
assertContainsHeader(key, Schema.STRING_SCHEMA, "third");
assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
headers.clear();
assertNoHeaderWithKey(key);
assertNoHeaderWithKey(other);
assertEquals(0, headers.size());
assertTrue(headers.isEmpty());
}
@Test
public void shouldTransformHeaders() {
populate(headers);
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
// Transform the headers
assertEquals(5, headers.size());
headers.apply(appendToKey("-suffix"));
assertEquals(5, headers.size());
assertNoHeaderWithKey(key);
assertNoHeaderWithKey(other);
String altKey = key + "-suffix";
iter = headers.allWithName(altKey);
assertNextHeader(iter, altKey, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, altKey, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, altKey, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, altKey, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other + "-suffix");
assertOnlyNextHeader(iter, other + "-suffix", Schema.STRING_SCHEMA, "other value");
}
@Test
public void shouldTransformHeadersWithKey() {
populate(headers);
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
// Transform the headers
assertEquals(5, headers.size());
headers.apply(key, appendToKey("-suffix"));
assertEquals(5, headers.size());
assertNoHeaderWithKey(key);
String altKey = key + "-suffix";
iter = headers.allWithName(altKey);
assertNextHeader(iter, altKey, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, altKey, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, altKey, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, altKey, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
}
@Test
public void shouldTransformAndRemoveHeaders() {
populate(headers);
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
assertNoNextHeader(iter);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
// Transform the headers
assertEquals(5, headers.size());
headers.apply(key, removeHeadersOfType(Type.STRING));
assertEquals(3, headers.size());
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNoNextHeader(iter);
assertHeader(headers.lastWithName(key), key, Schema.INT32_SCHEMA, 0);
iter = headers.allWithName(other);
assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
// Transform the headers
assertEquals(3, headers.size());
headers.apply(removeHeadersOfType(Type.STRING));
assertEquals(2, headers.size());
assertNoHeaderWithKey(other);
iter = headers.allWithName(key);
assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
assertNoNextHeader(iter);
}
protected HeaderTransform appendToKey(final String suffix) {
return new HeaderTransform() {
@Override
public Header apply(Header header) {
return header.rename(header.key() + suffix);
}
};
}
protected HeaderTransform removeHeadersOfType(final Type type) {
return new HeaderTransform() {
@Override
public Header apply(Header header) {
Schema schema = header.schema();
if (schema != null && schema.type() == type) {
return null;
}
return header;
}
};
}
@Test
public void shouldValidateBuildInTypes() {
assertSchemaMatches(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_BYTES_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_INT8_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_INT16_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_INT32_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_INT64_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_FLOAT32_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_FLOAT64_SCHEMA, null);
assertSchemaMatches(Schema.OPTIONAL_STRING_SCHEMA, null);
assertSchemaMatches(Schema.BOOLEAN_SCHEMA, true);
assertSchemaMatches(Schema.BYTES_SCHEMA, new byte[]{});
assertSchemaMatches(Schema.INT8_SCHEMA, (byte) 0);
assertSchemaMatches(Schema.INT16_SCHEMA, (short) 0);
assertSchemaMatches(Schema.INT32_SCHEMA, 0);
assertSchemaMatches(Schema.INT64_SCHEMA, 0L);
assertSchemaMatches(Schema.FLOAT32_SCHEMA, 1.0f);
assertSchemaMatches(Schema.FLOAT64_SCHEMA, 1.0d);
assertSchemaMatches(Schema.STRING_SCHEMA, "value");
assertSchemaMatches(SchemaBuilder.array(Schema.STRING_SCHEMA), new ArrayList<String>());
assertSchemaMatches(SchemaBuilder.array(Schema.STRING_SCHEMA), Collections.singletonList("value"));
assertSchemaMatches(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), new HashMap<String, Integer>());
assertSchemaMatches(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), Collections.singletonMap("a", 0));
Schema emptyStructSchema = SchemaBuilder.struct();
assertSchemaMatches(emptyStructSchema, new Struct(emptyStructSchema));
Schema structSchema = SchemaBuilder.struct().field("foo", Schema.OPTIONAL_BOOLEAN_SCHEMA).field("bar", Schema.STRING_SCHEMA)
.schema();
assertSchemaMatches(structSchema, new Struct(structSchema).put("foo", true).put("bar", "v"));
}
@Test
public void shouldValidateLogicalTypes() {
assertSchemaMatches(Decimal.schema(3), new BigDecimal(100.00));
assertSchemaMatches(Time.SCHEMA, new java.util.Date());
assertSchemaMatches(Date.SCHEMA, new java.util.Date());
assertSchemaMatches(Timestamp.SCHEMA, new java.util.Date());
}
@Test
public void shouldNotValidateNullValuesWithBuiltInTypes() {
assertSchemaDoesNotMatch(Schema.BOOLEAN_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.BYTES_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.INT8_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.INT16_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.INT32_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.INT64_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.FLOAT32_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.FLOAT64_SCHEMA, null);
assertSchemaDoesNotMatch(Schema.STRING_SCHEMA, null);
assertSchemaDoesNotMatch(SchemaBuilder.array(Schema.STRING_SCHEMA), null);
assertSchemaDoesNotMatch(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), null);
assertSchemaDoesNotMatch(SchemaBuilder.struct(), null);
}
@Test
public void shouldNotValidateMismatchedValuesWithBuiltInTypes() {
assertSchemaDoesNotMatch(Schema.BOOLEAN_SCHEMA, 0L);
assertSchemaDoesNotMatch(Schema.BYTES_SCHEMA, "oops");
assertSchemaDoesNotMatch(Schema.INT8_SCHEMA, 1.0f);
assertSchemaDoesNotMatch(Schema.INT16_SCHEMA, 1.0f);
assertSchemaDoesNotMatch(Schema.INT32_SCHEMA, 0L);
assertSchemaDoesNotMatch(Schema.INT64_SCHEMA, 1.0f);
assertSchemaDoesNotMatch(Schema.FLOAT32_SCHEMA, 1L);
assertSchemaDoesNotMatch(Schema.FLOAT64_SCHEMA, 1L);
assertSchemaDoesNotMatch(Schema.STRING_SCHEMA, true);
assertSchemaDoesNotMatch(SchemaBuilder.array(Schema.STRING_SCHEMA), "value");
assertSchemaDoesNotMatch(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), "value");
assertSchemaDoesNotMatch(SchemaBuilder.struct(), new ArrayList<String>());
}
@Test
public void shouldAddDate() {
java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime();
int days = Date.fromLogical(Date.SCHEMA, dateObj);
headers.addDate(key, dateObj);
Header header = headers.lastWithName(key);
assertEquals(days, (int) Values.convertToInteger(header.schema(), header.value()));
assertSame(dateObj, Values.convertToDate(header.schema(), header.value()));
headers.addInt(other, days);
header = headers.lastWithName(other);
assertEquals(days, (int) Values.convertToInteger(header.schema(), header.value()));
assertEquals(dateObj, Values.convertToDate(header.schema(), header.value()));
}
@Test
public void shouldAddTime() {
java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime();
long millis = Time.fromLogical(Time.SCHEMA, dateObj);
headers.addTime(key, dateObj);
Header header = headers.lastWithName(key);
assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
assertSame(dateObj, Values.convertToTime(header.schema(), header.value()));
headers.addLong(other, millis);
header = headers.lastWithName(other);
assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
assertEquals(dateObj, Values.convertToTime(header.schema(), header.value()));
}
@Test
public void shouldAddTimestamp() {
java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime();
long millis = Timestamp.fromLogical(Timestamp.SCHEMA, dateObj);
headers.addTimestamp(key, dateObj);
Header header = headers.lastWithName(key);
assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
assertSame(dateObj, Values.convertToTimestamp(header.schema(), header.value()));
headers.addLong(other, millis);
header = headers.lastWithName(other);
assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
assertEquals(dateObj, Values.convertToTimestamp(header.schema(), header.value()));
}
@Test
public void shouldAddDecimal() {
BigDecimal value = new BigDecimal("3.038573478e+3");
headers.addDecimal(key, value);
Header header = headers.lastWithName(key);
assertEquals(value.doubleValue(), Values.convertToDouble(header.schema(), header.value()), 0.00001d);
assertEquals(value, Values.convertToDecimal(header.schema(), header.value(), value.scale()));
value = value.setScale(3, RoundingMode.DOWN);
BigDecimal decimal = Values.convertToDecimal(header.schema(), header.value(), value.scale());
assertEquals(value, decimal.setScale(value.scale(), RoundingMode.DOWN));
}
@Test
public void shouldDuplicateAndAlwaysReturnEquivalentButDifferentObject() {
assertEquals(headers, headers.duplicate());
assertNotSame(headers, headers.duplicate());
}
protected void assertSchemaMatches(Schema schema, Object value) {
headers.checkSchemaMatches(new SchemaAndValue(schema.schema(), value));
}
protected void assertSchemaDoesNotMatch(Schema schema, Object value) {
try {
assertSchemaMatches(schema, value);
fail("Should have failed to validate value '" + value + "' and schema: " + schema);
} catch (DataException e) {
// expected
}
}
protected void attemptAndFailToAddHeader(String key, Schema schema, Object value) {
try {
headers.add(key, value, schema);
fail("Should have failed to add header with key '" + key + "', value '" + value + "', and schema: " + schema);
} catch (DataException e) {
// expected
}
}
protected void addHeader(String key, Schema schema, Object value) {
headers.add(key, value, schema);
Header header = headers.lastWithName(key);
assertNotNull(header);
assertHeader(header, key, schema, value);
}
protected void assertNoHeaderWithKey(String key) {
assertNoNextHeader(headers.allWithName(key));
}
protected void assertContainsHeader(String key, Schema schema, Object value) {
Header expected = new ConnectHeader(key, new SchemaAndValue(schema, value));
Iterator<Header> iter = headers.allWithName(key);
while (iter.hasNext()) {
Header header = iter.next();
if (header.equals(expected))
return;
}
fail("Should have found header " + expected);
}
protected void assertOnlySingleHeader(String key, Schema schema, Object value) {
assertOnlyNextHeader(headers.allWithName(key), key, schema, value);
}
protected void assertOnlyNextHeader(Iterator<Header> iter, String key, Schema schema, Object value) {
assertNextHeader(iter, key, schema, value);
assertNoNextHeader(iter);
}
protected void assertNextHeader(Iterator<Header> iter, String key, Schema schema, Object value) {
Header header = iter.next();
assertHeader(header, key, schema, value);
}
protected void assertNoNextHeader(Iterator<Header> iter) {
assertFalse(iter.hasNext());
}
protected void assertHeader(Header header, String key, Schema schema, Object value) {
assertNotNull(header);
assertSame(schema, header.schema());
assertSame(value, header.value());
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.sink;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class SinkRecordTest {
private static final String TOPIC_NAME = "myTopic";
private static final Integer PARTITION_NUMBER = 0;
private static final long KAFKA_OFFSET = 0L;
private static final Long KAFKA_TIMESTAMP = 0L;
private static final TimestampType TS_TYPE = TimestampType.CREATE_TIME;
private SinkRecord record;
@Before
public void beforeEach() {
record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
KAFKA_TIMESTAMP, TS_TYPE, null);
}
@Test
public void shouldCreateSinkRecordWithHeaders() {
Headers headers = new ConnectHeaders().addString("h1", "hv1").addBoolean("h2", true);
record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
KAFKA_TIMESTAMP, TS_TYPE, headers);
assertNotNull(record.headers());
assertSame(headers, record.headers());
assertFalse(record.headers().isEmpty());
}
@Test
public void shouldCreateSinkRecordWithEmptyHeaders() {
assertEquals(TOPIC_NAME, record.topic());
assertEquals(PARTITION_NUMBER, record.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, record.keySchema());
assertEquals("key", record.key());
assertEquals(Schema.BOOLEAN_SCHEMA, record.valueSchema());
assertEquals(false, record.value());
assertEquals(KAFKA_OFFSET, record.kafkaOffset());
assertEquals(KAFKA_TIMESTAMP, record.timestamp());
assertEquals(TS_TYPE, record.timestampType());
assertNotNull(record.headers());
assertTrue(record.headers().isEmpty());
}
@Test
public void shouldDuplicateRecordAndCloneHeaders() {
SinkRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
KAFKA_TIMESTAMP);
assertEquals(TOPIC_NAME, duplicate.topic());
assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
assertEquals("key", duplicate.key());
assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
assertEquals(false, duplicate.value());
assertEquals(KAFKA_OFFSET, duplicate.kafkaOffset());
assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
assertEquals(TS_TYPE, duplicate.timestampType());
assertNotNull(duplicate.headers());
assertTrue(duplicate.headers().isEmpty());
assertNotSame(record.headers(), duplicate.headers());
assertEquals(record.headers(), duplicate.headers());
}
@Test
public void shouldDuplicateRecordUsingNewHeaders() {
Headers newHeaders = new ConnectHeaders().addString("h3", "hv3");
SinkRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
KAFKA_TIMESTAMP, newHeaders);
assertEquals(TOPIC_NAME, duplicate.topic());
assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
assertEquals("key", duplicate.key());
assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
assertEquals(false, duplicate.value());
assertEquals(KAFKA_OFFSET, duplicate.kafkaOffset());
assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
assertEquals(TS_TYPE, duplicate.timestampType());
assertNotNull(duplicate.headers());
assertEquals(newHeaders, duplicate.headers());
assertSame(newHeaders, duplicate.headers());
assertNotSame(record.headers(), duplicate.headers());
assertNotEquals(record.headers(), duplicate.headers());
}
@Test
public void shouldModifyRecordHeader() {
assertTrue(record.headers().isEmpty());
record.headers().addInt("intHeader", 100);
assertEquals(1, record.headers().size());
Header header = record.headers().lastWithName("intHeader");
assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value()));
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.source;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class SourceRecordTest {
private static final Map<String, ?> SOURCE_PARTITION = Collections.singletonMap("src", "abc");
private static final Map<String, ?> SOURCE_OFFSET = Collections.singletonMap("offset", "1");
private static final String TOPIC_NAME = "myTopic";
private static final Integer PARTITION_NUMBER = 0;
private static final Long KAFKA_TIMESTAMP = 0L;
private SourceRecord record;
@Before
public void beforeEach() {
record = new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key",
Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP, null);
}
@Test
public void shouldCreateSinkRecordWithHeaders() {
Headers headers = new ConnectHeaders().addString("h1", "hv1").addBoolean("h2", true);
record = new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key",
Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP, headers);
assertNotNull(record.headers());
assertSame(headers, record.headers());
assertFalse(record.headers().isEmpty());
}
@Test
public void shouldCreateSinkRecordWithEmtpyHeaders() {
assertEquals(SOURCE_PARTITION, record.sourcePartition());
assertEquals(SOURCE_OFFSET, record.sourceOffset());
assertEquals(TOPIC_NAME, record.topic());
assertEquals(PARTITION_NUMBER, record.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, record.keySchema());
assertEquals("key", record.key());
assertEquals(Schema.BOOLEAN_SCHEMA, record.valueSchema());
assertEquals(false, record.value());
assertEquals(KAFKA_TIMESTAMP, record.timestamp());
assertNotNull(record.headers());
assertTrue(record.headers().isEmpty());
}
@Test
public void shouldDuplicateRecordAndCloneHeaders() {
SourceRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
KAFKA_TIMESTAMP);
assertEquals(SOURCE_PARTITION, duplicate.sourcePartition());
assertEquals(SOURCE_OFFSET, duplicate.sourceOffset());
assertEquals(TOPIC_NAME, duplicate.topic());
assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
assertEquals("key", duplicate.key());
assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
assertEquals(false, duplicate.value());
assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
assertNotNull(duplicate.headers());
assertTrue(duplicate.headers().isEmpty());
assertNotSame(record.headers(), duplicate.headers());
assertEquals(record.headers(), duplicate.headers());
}
@Test
public void shouldDuplicateRecordUsingNewHeaders() {
Headers newHeaders = new ConnectHeaders().addString("h3", "hv3");
SourceRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
KAFKA_TIMESTAMP, newHeaders);
assertEquals(SOURCE_PARTITION, duplicate.sourcePartition());
assertEquals(SOURCE_OFFSET, duplicate.sourceOffset());
assertEquals(TOPIC_NAME, duplicate.topic());
assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
assertEquals("key", duplicate.key());
assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
assertEquals(false, duplicate.value());
assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
assertNotNull(duplicate.headers());
assertEquals(newHeaders, duplicate.headers());
assertSame(newHeaders, duplicate.headers());
assertNotSame(record.headers(), duplicate.headers());
assertNotEquals(record.headers(), duplicate.headers());
}
@Test
public void shouldModifyRecordHeader() {
assertTrue(record.headers().isEmpty());
record.headers().addInt("intHeader", 100);
assertEquals(1, record.headers().size());
Header header = record.headers().lastWithName("intHeader");
assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value()));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ConverterTypeTest {
@Test
public void shouldFindByName() {
for (ConverterType type : ConverterType.values()) {
assertEquals(type, ConverterType.withName(type.getName()));
}
}
}

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class SimpleHeaderConverterTest {
private static final String TOPIC = "topic";
private static final String HEADER = "header";
private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
private static final Schema STRING_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).schema();
private static final Map<String, Short> STRING_SHORT_MAP = new LinkedHashMap<>();
private static final Schema STRING_SHORT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT16_SCHEMA).schema();
private static final Map<String, Integer> STRING_INT_MAP = new LinkedHashMap<>();
private static final Schema STRING_INT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).schema();
private static final List<Integer> INT_LIST = new ArrayList<>();
private static final Schema INT_LIST_SCHEMA = SchemaBuilder.array(Schema.INT32_SCHEMA).schema();
private static final List<String> STRING_LIST = new ArrayList<>();
private static final Schema STRING_LIST_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).schema();
static {
STRING_MAP.put("foo", "123");
STRING_MAP.put("bar", "baz");
STRING_SHORT_MAP.put("foo", (short) 12345);
STRING_SHORT_MAP.put("bar", (short) 0);
STRING_SHORT_MAP.put("baz", (short) -4321);
STRING_INT_MAP.put("foo", 1234567890);
STRING_INT_MAP.put("bar", 0);
STRING_INT_MAP.put("baz", -987654321);
STRING_LIST.add("foo");
STRING_LIST.add("bar");
INT_LIST.add(1234567890);
INT_LIST.add(-987654321);
}
private SimpleHeaderConverter converter;
@Before
public void beforeEach() {
converter = new SimpleHeaderConverter();
}
@Test
public void shouldConvertNullValue() {
assertRoundTrip(Schema.STRING_SCHEMA, null);
assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, null);
}
@Test
public void shouldConvertSimpleString() {
assertRoundTrip(Schema.STRING_SCHEMA, "simple");
}
@Test
public void shouldConvertEmptyString() {
assertRoundTrip(Schema.STRING_SCHEMA, "");
}
@Test
public void shouldConvertStringWithQuotesAndOtherDelimiterCharacters() {
assertRoundTrip(Schema.STRING_SCHEMA, "three\"blind\\\"mice");
assertRoundTrip(Schema.STRING_SCHEMA, "string with delimiters: <>?,./\\=+-!@#$%^&*(){}[]|;':");
}
@Test
public void shouldConvertMapWithStringKeys() {
assertRoundTrip(STRING_MAP_SCHEMA, STRING_MAP);
}
@Test
public void shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":\"123\",\"bar\":\"baz\"}");
assertEquals(STRING_MAP_SCHEMA, result.schema());
assertEquals(STRING_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{ \"foo\" : \"123\", \n\"bar\" : \"baz\" } ");
assertEquals(STRING_MAP_SCHEMA, result.schema());
assertEquals(STRING_MAP, result.value());
}
@Test
public void shouldConvertMapWithStringKeysAndShortValues() {
assertRoundTrip(STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP);
}
@Test
public void shouldParseStringOfMapWithShortValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":12345,\"bar\":0,\"baz\":-4321}");
assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
assertEquals(STRING_SHORT_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithShortValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, " { \"foo\" : 12345 , \"bar\" : 0, \"baz\" : -4321 } ");
assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
assertEquals(STRING_SHORT_MAP, result.value());
}
@Test
public void shouldConvertMapWithStringKeysAndIntegerValues() {
assertRoundTrip(STRING_INT_MAP_SCHEMA, STRING_INT_MAP);
}
@Test
public void shouldParseStringOfMapWithIntValuesWithoutWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":1234567890,\"bar\":0,\"baz\":-987654321}");
assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
assertEquals(STRING_INT_MAP, result.value());
}
@Test
public void shouldParseStringOfMapWithIntValuesWithWhitespaceAsMap() {
SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"bar\" : 0, \"baz\" : -987654321 } ");
assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
assertEquals(STRING_INT_MAP, result.value());
}
@Test
public void shouldConvertListWithStringValues() {
assertRoundTrip(STRING_LIST_SCHEMA, STRING_LIST);
}
@Test
public void shouldConvertListWithIntegerValues() {
assertRoundTrip(INT_LIST_SCHEMA, INT_LIST);
}
@Test
public void shouldConvertMapWithStringKeysAndMixedValuesToMapWithoutSchema() {
Map<String, Object> map = new LinkedHashMap<>();
map.put("foo", "bar");
map.put("baz", (short) 3456);
assertRoundTrip(null, map);
}
@Test
public void shouldConvertListWithMixedValuesToListWithoutSchema() {
List<Object> list = new ArrayList<>();
list.add("foo");
list.add((short) 13344);
assertRoundTrip(null, list);
}
@Test
public void shouldConvertEmptyMapToMapWithoutSchema() {
assertRoundTrip(null, new LinkedHashMap<>());
}
@Test
public void shouldConvertEmptyListToListWithoutSchema() {
assertRoundTrip(null, new ArrayList<>());
}
protected SchemaAndValue roundTrip(Schema schema, Object input) {
byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, input);
return converter.toConnectHeader(TOPIC, HEADER, serialized);
}
protected void assertRoundTrip(Schema schema, Object value) {
byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, value);
SchemaAndValue result = converter.toConnectHeader(TOPIC, HEADER, serialized);
if (value == null) {
assertNull(serialized);
assertNull(result.schema());
assertNull(result.value());
} else {
assertNotNull(serialized);
assertEquals(value, result.value());
assertEquals(schema, result.schema());
byte[] serialized2 = converter.fromConnectHeader(TOPIC, HEADER, result.schema(), result.value());
SchemaAndValue result2 = converter.toConnectHeader(TOPIC, HEADER, serialized2);
assertNotNull(serialized2);
assertEquals(schema, result2.schema());
assertEquals(value, result2.value());
assertEquals(result, result2);
assertArrayEquals(serialized, serialized);
}
}
}

View File

@ -79,4 +79,22 @@ public class StringConverterTest {
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
assertEquals(SAMPLE_STRING, data.value());
}
// Note: the header conversion methods delegates to the data conversion methods, which are tested above.
// The following simply verify that the delegation works.
@Test
public void testStringHeaderValueToBytes() throws UnsupportedEncodingException {
assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromConnectHeader(TOPIC, "hdr", Schema.STRING_SCHEMA, SAMPLE_STRING));
}
@Test
public void testNonStringHeaderValueToBytes() throws UnsupportedEncodingException {
assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectHeader(TOPIC, "hdr", Schema.BOOLEAN_SCHEMA, true));
}
@Test
public void testNullHeaderValueToBytes() {
assertEquals(null, converter.fromConnectHeader(TOPIC, "hdr", Schema.OPTIONAL_STRING_SCHEMA, null));
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
@ -36,6 +37,9 @@ import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverterConfig;
import java.io.IOException;
import java.math.BigDecimal;
@ -48,13 +52,13 @@ import java.util.Iterator;
import java.util.Map;
/**
* Implementation of Converter that uses JSON to store schemas and objects.
* Implementation of Converter that uses JSON to store schemas and objects. By default this converter will serialize Connect keys, values,
* and headers with schemas, although this can be disabled with {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable}
* configuration option.
*
* This implementation currently does nothing with the topic names or header names.
*/
public class JsonConverter implements Converter {
private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
private static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
public class JsonConverter implements Converter, HeaderConverter {
private static final Map<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
@ -262,8 +266,8 @@ public class JsonConverter implements Converter {
}
private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
private boolean enableSchemas = JsonConverterConfig.SCHEMAS_ENABLE_DEFAULT;
private int cacheSize = JsonConverterConfig.SCHEMAS_CACHE_SIZE_DEFAULT;
private Cache<Schema, ObjectNode> fromConnectSchemaCache;
private Cache<JsonNode, Schema> toConnectSchemaCache;
@ -271,21 +275,46 @@ public class JsonConverter implements Converter {
private final JsonDeserializer deserializer = new JsonDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
if (enableConfigsVal != null)
enableSchemas = enableConfigsVal.toString().equals("true");
public ConfigDef config() {
return JsonConverterConfig.configDef();
}
@Override
public void configure(Map<String, ?> configs) {
JsonConverterConfig config = new JsonConverterConfig(configs);
enableSchemas = config.schemasEnabled();
cacheSize = config.schemaCacheSize();
boolean isKey = config.type() == ConverterType.KEY;
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG);
if (cacheSizeVal != null)
cacheSize = Integer.parseInt((String) cacheSizeVal);
fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Map<String, Object> conf = new HashMap<>(configs);
conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName());
configure(conf);
}
@Override
public void close() {
// do nothing
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return toConnectData(topic, value);
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
@ -456,7 +485,7 @@ public class JsonConverter implements Converter {
break;
case JsonSchema.ARRAY_TYPE_NAME:
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null)
if (elemSchema == null || elemSchema.isNull())
throw new DataException("Array schema did not specify the element type");
builder = SchemaBuilder.array(asConnectSchema(elemSchema));
break;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.json;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.storage.ConverterConfig;
import java.util.Map;
/**
* Configuration options for {@link JsonConverter} instances.
*/
public class JsonConverterConfig extends ConverterConfig {
public static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
public static final boolean SCHEMAS_ENABLE_DEFAULT = true;
private static final String SCHEMAS_ENABLE_DOC = "Include schemas within each of the serialized values and keys.";
private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";
public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance.";
private static final String SCHEMAS_CACHE_SIZE_DISPLAY = "Schema Cache Size";
private final static ConfigDef CONFIG;
static {
String group = "Schemas";
int orderInGroup = 0;
CONFIG = ConverterConfig.newConfigDef();
CONFIG.define(SCHEMAS_ENABLE_CONFIG, Type.BOOLEAN, SCHEMAS_ENABLE_DEFAULT, Importance.HIGH, SCHEMAS_ENABLE_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY);
}
public static ConfigDef configDef() {
return CONFIG;
}
public JsonConverterConfig(Map<String, ?> props) {
super(CONFIG, props);
}
/**
* Return whether schemas are enabled.
*
* @return true if enabled, or false otherwise
*/
public boolean schemasEnabled() {
return getBoolean(SCHEMAS_ENABLE_CONFIG);
}
/**
* Get the cache size.
*
* @return the cache size
*/
public int schemaCacheSize() {
return getInt(SCHEMAS_CACHE_SIZE_CONFIG);
}
}

View File

@ -739,6 +739,23 @@ public class JsonConverterTest {
}
// Note: the header conversion methods delegates to the data conversion methods, which are tested above.
// The following simply verify that the delegation works.
@Test
public void testStringHeaderToJson() throws UnsupportedEncodingException {
JsonNode converted = parse(converter.fromConnectHeader(TOPIC, "headerName", Schema.STRING_SCHEMA, "test-string"));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
}
@Test
public void stringHeaderToConnect() {
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectHeader(TOPIC, "headerName", "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
}
private JsonNode parse(byte[] json) {
try {
return objectMapper.readTree(json);

View File

@ -17,17 +17,33 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.HeaderConverter;
import java.util.Map;
/**
* Pass-through converter for raw byte data.
*
* This implementation currently does nothing with the topic names or header names.
*/
public class ByteArrayConverter implements Converter {
public class ByteArrayConverter implements Converter, HeaderConverter {
private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
@ -49,4 +65,18 @@ public class ByteArrayConverter implements Converter {
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return toConnectData(topic, value);
}
@Override
public void close() {
// do nothing
}
}

View File

@ -73,6 +73,11 @@ public class ConnectorConfig extends AbstractConfig {
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
public static final int TASKS_MAX_DEFAULT = 1;
@ -96,12 +101,14 @@ public class ConnectorConfig extends AbstractConfig {
}
public static ConfigDef configDef() {
int orderInGroup = 0;
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
@ -110,7 +117,7 @@ public class ConnectorConfig extends AbstractConfig {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
}
}), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
}), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY);
}
public ConnectorConfig(Plugins plugins) {

View File

@ -36,6 +36,9 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
@ -383,29 +386,29 @@ public class Worker {
// search for converters within the connector dependencies, and if not found the
// plugin class loader delegates loading to the delegating classloader.
Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
if (keyConverter != null)
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
else {
Converter defaultKeyConverter = plugins.newConverter(
config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
config
);
defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
keyConverter = defaultKeyConverter;
}
Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
if (valueConverter != null)
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
else {
Converter defaultValueConverter = plugins.newConverter(
config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
config
);
defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
valueConverter = defaultValueConverter;
if (keyConverter == null) {
String className = config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName();
keyConverter = plugins.newConverter(className, config);
}
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader);
Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
if (valueConverter == null) {
String className = config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName();
valueConverter = plugins.newConverter(className, config);
}
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
HeaderConverter headerConverter = connConfig.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, HeaderConverter.class);
if (headerConverter == null) {
String className = config.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG).getName();
headerConverter = plugins.newHeaderConverter(className, config);
}
Map<String, Object> converterConfig = connConfig.originalsWithPrefix("header.converter.");
converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
headerConverter.configure(converterConfig);
workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
workerTask.initialize(taskConfig);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
@ -437,6 +440,7 @@ public class Worker {
TargetState initialState,
Converter keyConverter,
Converter valueConverter,
HeaderConverter headerConverter,
ClassLoader loader) {
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
@ -446,12 +450,12 @@ public class Worker {
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
valueConverter, transformationChain, loader, time);
valueConverter, headerConverter, transformationChain, loader, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import java.util.ArrayList;
import java.util.Arrays;
@ -63,6 +64,15 @@ public class WorkerConfig extends AbstractConfig {
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";
public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
public static final String HEADER_CONVERTER_CLASS_DOC =
"HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the header values in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize" +
" header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();
public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@ -222,7 +232,11 @@ public class WorkerConfig extends AbstractConfig {
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
"", Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
.define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
}
@Override

View File

@ -38,10 +38,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
@ -70,6 +73,7 @@ class WorkerSinkTask extends WorkerTask {
private final Time time;
private final Converter keyConverter;
private final Converter valueConverter;
private final HeaderConverter headerConverter;
private final TransformationChain<SinkRecord> transformationChain;
private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
private KafkaConsumer<byte[], byte[]> consumer;
@ -94,6 +98,7 @@ class WorkerSinkTask extends WorkerTask {
ConnectMetrics connectMetrics,
Converter keyConverter,
Converter valueConverter,
HeaderConverter headerConverter,
TransformationChain<SinkRecord> transformationChain,
ClassLoader loader,
Time time) {
@ -103,6 +108,7 @@ class WorkerSinkTask extends WorkerTask {
this.task = task;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.headerConverter = headerConverter;
this.transformationChain = transformationChain;
this.time = time;
this.messageBatch = new ArrayList<>();
@ -474,13 +480,15 @@ class WorkerSinkTask extends WorkerTask {
this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
Headers headers = convertHeadersFor(msg);
Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
keyAndSchema.schema(), keyAndSchema.value(),
valueAndSchema.schema(), valueAndSchema.value(),
msg.offset(),
timestamp,
msg.timestampType());
msg.timestampType(),
headers);
log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
SinkRecord transRecord = transformationChain.apply(origRecord);
@ -498,6 +506,19 @@ class WorkerSinkTask extends WorkerTask {
sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
}
private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
Headers result = new ConnectHeaders();
org.apache.kafka.common.header.Headers recordHeaders = record.headers();
if (recordHeaders != null) {
String topic = record.topic();
for (org.apache.kafka.common.header.Header recordHeader : recordHeaders) {
SchemaAndValue schemaAndValue = headerConverter.toConnectHeader(topic, recordHeader.key(), recordHeader.value());
result.add(recordHeader.key(), schemaAndValue);
}
}
return result;
}
private void resumeAll() {
for (TopicPartition tp : consumer.assignment())
if (!context.pausedPartitions().contains(tp))

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@ -30,10 +31,13 @@ import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectUtils;
@ -62,6 +66,7 @@ class WorkerSourceTask extends WorkerTask {
private final SourceTask task;
private final Converter keyConverter;
private final Converter valueConverter;
private final HeaderConverter headerConverter;
private final TransformationChain<SourceRecord> transformationChain;
private KafkaProducer<byte[], byte[]> producer;
private final OffsetStorageReader offsetReader;
@ -89,6 +94,7 @@ class WorkerSourceTask extends WorkerTask {
TargetState initialState,
Converter keyConverter,
Converter valueConverter,
HeaderConverter headerConverter,
TransformationChain<SourceRecord> transformationChain,
KafkaProducer<byte[], byte[]> producer,
OffsetStorageReader offsetReader,
@ -103,6 +109,7 @@ class WorkerSourceTask extends WorkerTask {
this.task = task;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.headerConverter = headerConverter;
this.transformationChain = transformationChain;
this.producer = producer;
this.offsetReader = offsetReader;
@ -216,10 +223,11 @@ class WorkerSourceTask extends WorkerTask {
continue;
}
RecordHeaders headers = convertHeaderFor(record);
byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(),
ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value);
ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value());
// We need this queued first since the callback could happen immediately (even synchronously in some cases).
// Because of this we need to be careful about handling retries -- we always save the previously attempted
@ -278,6 +286,20 @@ class WorkerSourceTask extends WorkerTask {
return true;
}
private RecordHeaders convertHeaderFor(SourceRecord record) {
Headers headers = record.headers();
RecordHeaders result = new RecordHeaders();
if (headers != null) {
String topic = record.topic();
for (Header header : headers) {
String key = header.key();
byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
result.add(key, rawHeader);
}
}
return result;
}
private void commitTaskRecord(SourceRecord record) {
try {
task.commitRecord(record);

View File

@ -25,6 +25,7 @@ import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -200,7 +201,26 @@ public class Plugins {
throw new ConnectException(
"Failed to find any class that implements Converter and which name matches "
+ converterClassOrAlias
+ ", available connectors are: "
+ ", available converters are: "
+ pluginNames(delegatingLoader.converters())
);
}
return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
}
public HeaderConverter newHeaderConverter(String converterClassOrAlias, AbstractConfig config) {
Class<? extends HeaderConverter> klass;
try {
klass = pluginClass(
delegatingLoader,
converterClassOrAlias,
HeaderConverter.class
);
} catch (ClassNotFoundException e) {
throw new ConnectException(
"Failed to find any class that implements HeaderConverter and which name matches "
+ converterClassOrAlias
+ ", available header converters are: "
+ pluginNames(delegatingLoader.converters())
);
}

View File

@ -31,9 +31,13 @@ import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Collections;
@ -43,27 +47,34 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.powermock.api.easymock.PowerMock.verifyAll;
import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.easymock.EasyMock.strictMock;
import static org.easymock.EasyMock.partialMockBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AbstractHerderTest extends EasyMockSupport {
private final Worker worker = strictMock(Worker.class);
@RunWith(PowerMockRunner.class)
@PrepareForTest({AbstractHerder.class})
public class AbstractHerderTest {
private final String workerId = "workerId";
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
private final String connector = "connector";
private final Plugins plugins = strictMock(Plugins.class);
private final ClassLoader classLoader = strictMock(ClassLoader.class);
@MockStrict private Worker worker;
@MockStrict private Plugins plugins;
@MockStrict private ClassLoader classLoader;
@MockStrict private ConfigBackingStore configStore;
@MockStrict private StatusBackingStore statusStore;
@Test
public void connectorStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
.withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
@ -96,7 +107,7 @@ public class AbstractHerderTest extends EasyMockSupport {
assertEquals("UNASSIGNED", taskState.state());
assertEquals(workerId, taskState.workerId());
verifyAll();
PowerMock.verifyAll();
}
@Test
@ -104,9 +115,6 @@ public class AbstractHerderTest extends EasyMockSupport {
ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
String workerId = "workerId";
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
.withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
@ -163,14 +171,14 @@ public class AbstractHerderTest extends EasyMockSupport {
assertEquals(TestSourceConnector.class.getName(), result.name());
assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
assertEquals(2, result.errorCount());
// Base connector config has 6 fields, connector's configs add 2
assertEquals(8, result.values().size());
// Base connector config has 7 fields, connector's configs add 2
assertEquals(9, result.values().size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
assertEquals(1, result.values().get(0).configValue().errors().size());
// "required" config from connector should generate an error
assertEquals("required", result.values().get(6).configValue().name());
assertEquals(1, result.values().get(6).configValue().errors().size());
assertEquals("required", result.values().get(7).configValue().name());
assertEquals(1, result.values().get(7).configValue().errors().size());
verifyAll();
}
@ -209,15 +217,15 @@ public class AbstractHerderTest extends EasyMockSupport {
);
assertEquals(expectedGroups, result.groups());
assertEquals(2, result.errorCount());
// Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and
// Base connector config has 7 fields, connector's configs add 2, 2 type fields from the transforms, and
// 1 from the valid transformation's config
assertEquals(11, result.values().size());
assertEquals(12, result.values().size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", result.values().get(6).configValue().name());
assertTrue(result.values().get(6).configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name());
assertEquals("transforms.xformB.type", result.values().get(8).configValue().name());
assertFalse(result.values().get(8).configValue().errors().isEmpty());
assertEquals("transforms.xformA.type", result.values().get(7).configValue().name());
assertTrue(result.values().get(7).configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig", result.values().get(8).configValue().name());
assertEquals("transforms.xformB.type", result.values().get(9).configValue().name());
assertFalse(result.values().get(9).configValue().errors().isEmpty());
verifyAll();
}
@ -262,7 +270,7 @@ public class AbstractHerderTest extends EasyMockSupport {
@Override
public ConfigDef config() {
return new ConfigDef()
.define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
.define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
}
@Override

View File

@ -54,6 +54,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
private ConcurrentHashMap committers;
@Mock
private Logger mockLog;
@Mock private ScheduledFuture commitFuture;
@Mock private ScheduledFuture taskFuture;
@Mock private ConnectorTaskId taskId;
@Mock private WorkerSourceTask task;
private SourceTaskOffsetCommitter committer;
@ -81,15 +85,11 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
public void testSchedule() throws Exception {
Capture<Runnable> taskWrapper = EasyMock.newCapture();
ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class);
EasyMock.expect(executor.scheduleWithFixedDelay(
EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
).andReturn(commitFuture);
ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class);
EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);
PowerMock.replayAll();
@ -135,9 +135,6 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
@Test
public void testRemove() throws Exception {
ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class);
// Try to remove a non-existing task
EasyMock.expect(committers.remove(taskId)).andReturn(null);
PowerMock.replayAll();
@ -148,10 +145,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
PowerMock.resetAll();
// Try to remove an existing task
EasyMock.expect(committers.remove(taskId)).andReturn(task);
EasyMock.expect(task.cancel(eq(false))).andReturn(false);
EasyMock.expect(task.isDone()).andReturn(false);
EasyMock.expect(task.get()).andReturn(null);
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andReturn(null);
PowerMock.replayAll();
committer.remove(taskId);
@ -160,10 +157,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
PowerMock.resetAll();
// Try to remove a cancelled task
EasyMock.expect(committers.remove(taskId)).andReturn(task);
EasyMock.expect(task.cancel(eq(false))).andReturn(false);
EasyMock.expect(task.isDone()).andReturn(false);
EasyMock.expect(task.get()).andThrow(new CancellationException());
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
PowerMock.expectLastCall();
PowerMock.replayAll();
@ -174,10 +171,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
PowerMock.resetAll();
// Try to remove an interrupted task
EasyMock.expect(committers.remove(taskId)).andReturn(task);
EasyMock.expect(task.cancel(eq(false))).andReturn(false);
EasyMock.expect(task.isDone()).andReturn(false);
EasyMock.expect(task.get()).andThrow(new InterruptedException());
EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
EasyMock.expect(taskFuture.isDone()).andReturn(false);
EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException());
PowerMock.replayAll();
try {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
import org.easymock.Capture;
@ -122,6 +123,8 @@ public class WorkerSinkTaskTest {
@Mock
private Converter valueConverter;
@Mock
private HeaderConverter headerConverter;
@Mock
private TransformationChain<SinkRecord> transformationChain;
@Mock
private TaskStatus.Listener statusListener;
@ -154,7 +157,7 @@ public class WorkerSinkTaskTest {
private void createTask(TargetState initialState) {
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, transformationChain, pluginLoader, time);
taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time);
}
@After

View File

@ -34,6 +34,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.util.ThreadedTest;
@ -108,6 +109,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private PluginClassLoader pluginLoader;
@Mock private Converter keyConverter;
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain transformationChain;
private WorkerSinkTask workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
@ -135,7 +137,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
valueConverter, TransformationChain.noOp(), pluginLoader, time);
valueConverter, headerConverter, TransformationChain.noOp(), pluginLoader, time);
recordsReturned = 0;
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
@ -93,6 +94,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private SourceTask sourceTask;
@Mock private Converter keyConverter;
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
@Mock private KafkaProducer<byte[], byte[]> producer;
@Mock private OffsetStorageReader offsetReader;
@ -140,8 +142,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask(TargetState initialState) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain,
producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
}
@Test

View File

@ -23,9 +23,14 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
import java.util.Map;
@ -37,6 +42,9 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest({WorkerTask.class})
@PowerMockIgnore("javax.management.*")
public class WorkerTaskTest {
private static final Map<String, String> TASK_PROPS = new HashMap<>();
@ -46,6 +54,8 @@ public class WorkerTaskTest {
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectMetrics metrics;
@Mock private TaskStatus.Listener statusListener;
@Mock private ClassLoader loader;
@Before
public void setup() {
@ -61,9 +71,6 @@ public class WorkerTaskTest {
public void standardStartup() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
ClassLoader loader = EasyMock.createMock(ClassLoader.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
.withConstructor(
ConnectorTaskId.class,
@ -110,9 +117,6 @@ public class WorkerTaskTest {
public void stopBeforeStarting() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
ClassLoader loader = EasyMock.createMock(ClassLoader.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
.withConstructor(
ConnectorTaskId.class,
@ -152,9 +156,6 @@ public class WorkerTaskTest {
public void cancelBeforeStopping() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
ClassLoader loader = EasyMock.createMock(ClassLoader.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
.withConstructor(
ConnectorTaskId.class,
@ -220,7 +221,6 @@ public class WorkerTaskTest {
public void updateMetricsOnListenerEventsForStartupPauseResumeAndShutdown() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
ConnectMetrics metrics = new MockConnectMetrics();
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
statusListener.onStartup(taskId);
@ -255,7 +255,6 @@ public class WorkerTaskTest {
MockConnectMetrics metrics = new MockConnectMetrics();
MockTime time = metrics.time();
ConnectException error = new ConnectException("error");
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
statusListener.onStartup(taskId);

View File

@ -38,6 +38,7 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
@ -92,6 +93,13 @@ public class WorkerTest extends ThreadedTest {
@MockStrict
private ConnectorStatus.Listener connectorStatusListener;
@Mock private Connector connector;
@Mock private ConnectorContext ctx;
@Mock private TestSourceTask task;
@Mock private WorkerSourceTask workerTask;
@Mock private Converter keyConverter;
@Mock private Converter valueConverter;
@Before
public void setup() {
super.setup();
@ -116,9 +124,6 @@ public class WorkerTest extends ThreadedTest {
expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
.andReturn(connector);
@ -187,8 +192,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
@ -233,10 +236,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@ -301,10 +300,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@ -381,10 +376,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
.andReturn(connector);
@ -473,9 +464,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters(true);
expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
@ -486,6 +474,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
@ -609,9 +598,6 @@ public class WorkerTest extends ThreadedTest {
expectConverters(true);
expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
@ -622,6 +608,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
@ -692,12 +679,11 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
Capture<TestConverter> keyConverter = EasyMock.newCapture();
Capture<TestConverter> valueConverter = EasyMock.newCapture();
Capture<HeaderConverter> headerConverter = EasyMock.newCapture();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
PowerMock.expectNew(
@ -707,6 +693,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.eq(TargetState.STARTED),
EasyMock.capture(keyConverter),
EasyMock.capture(valueConverter),
EasyMock.capture(headerConverter),
EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
@ -843,9 +830,6 @@ public class WorkerTest extends ThreadedTest {
private void expectConverters(Class<? extends Converter> converterClass, Boolean expectDefaultConverters) {
// As default converters are instantiated when a task starts, they are expected only if the `startTask` method is called
if (expectDefaultConverters) {
// connector default
Converter keyConverter = PowerMock.createMock(converterClass);
Converter valueConverter = PowerMock.createMock(converterClass);
// Instantiate and configure default
EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))

View File

@ -394,7 +394,7 @@
}
</pre>
<p>Again, we've omitted some details, but we can see the important steps: the <code>poll()</code> method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output <code>SourceRecord</code> with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output topic name, and output value (the line, and we include a schema indicating this value will always be a string). Other variants of the <code>SourceRecord</code> constructor can also include a specific output partition and a key.</p>
<p>Again, we've omitted some details, but we can see the important steps: the <code>poll()</code> method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output <code>SourceRecord</code> with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output topic name, and output value (the line, and we include a schema indicating this value will always be a string). Other variants of the <code>SourceRecord</code> constructor can also include a specific output partition, a key, and headers.</p>
<p>Note that this implementation uses the normal Java <code>InputStream</code> interface and may sleep if data is not available. This is acceptable because Kafka Connect provides each task with a dedicated thread. While task implementations have to conform to the basic <code>poll()</code> interface, they have a lot of flexibility in how they are implemented. In this case, an NIO-based implementation would be more efficient, but this simple approach works, is quick to implement, and is compatible with older versions of Java.</p>
@ -414,7 +414,7 @@
}
</pre>
<p>The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>: Kafka topic, partition, offset and the event key and value.</p>
<p>The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>: Kafka topic, partition, offset, the event key and value, and optional headers.</p>
<p>The <code>flush()</code> method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged. The <code>offsets</code> parameter can often be ignored, but is useful in some cases where implementations want to store offset information in the destination store to provide exactly-once
delivery. For example, an HDFS connector could do this and use atomic move operations to make sure the <code>flush()</code> operation atomically commits the data and offsets to a final location in HDFS.</p>

View File

@ -74,6 +74,7 @@
Kafka Streams retries and can <a href="/{{version}}/documentation/streams/developer-guide/config-streams">configure<a/>
fine-grained timeouts (instead of hard coded retries as in older version).</li>
<li>Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.</li>
<li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
</ul>
<h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>