Restrict offset format.

Restrict offsets to be serialized via schemaless objects, but validate that they
are maps with string keys and primitive values. This is enforced at the Copycat
level rather than validating a Schema.

Additionally, parameterize copycat system test to work with and without schemas,
and to validate both JSON and String converters.
This commit is contained in:
Ewen Cheslack-Postava 2015-08-27 16:48:10 -07:00
parent 85797e7910
commit 320d0df219
21 changed files with 263 additions and 124 deletions

View File

@ -115,6 +115,21 @@ public class AbstractConfig {
return copy;
}
/**
* Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
*
* @param prefix the prefix to use as a filter
* @return a Map containing the settings with the prefix
*/
public Map<String, Object> originalsWithPrefix(String prefix) {
Map<String, Object> result = new HashMap<String, Object>();
for(Map.Entry<String, ?> entry : originals.entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
}
return result;
}
public Map<String, ?> values() {
return new HashMap<String, Object>(values);
}

View File

@ -17,10 +17,12 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
public class AbstractConfigTest {
@ -35,6 +37,17 @@ public class AbstractConfigTest {
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
}
@Test
public void testOriginalsWithPrefix() {
Properties props = new Properties();
props.put("foo.bar", "abc");
props.put("setting", "def");
TestConfig config = new TestConfig(props);
Map<String, Object> expected = new HashMap<>();
expected.put("bar", "abc");
assertEquals(expected, config.originalsWithPrefix("foo."));
}
private void testValidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);

View File

@ -16,8 +16,21 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
# Offset data is never visible outside of Copcyat.
offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
offset.key.converter.schemas.enable=false
offset.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.CopycatRecord;
import org.apache.kafka.copycat.data.Schema;
import java.util.Map;
/**
* <p>
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
@ -41,47 +43,32 @@ import org.apache.kafka.copycat.data.Schema;
*/
@InterfaceStability.Unstable
public class SourceRecord extends CopycatRecord {
private final Schema sourcePartitionSchema;
private final Object sourcePartition;
private final Schema sourceOffsetSchema;
private final Object sourceOffset;
private final Map<String, ?> sourcePartition;
private final Map<String, ?> sourceOffset;
public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
Schema sourceOffsetSchema, Object sourceOffset,
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition, Schema valueSchema, Object value) {
this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value);
this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
}
public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
Schema sourceOffsetSchema, Object sourceOffset,
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Schema valueSchema, Object value) {
this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value);
this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
}
public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
Schema sourceOffsetSchema, Object sourceOffset,
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key, Schema valueSchema, Object value) {
super(topic, partition, keySchema, key, valueSchema, value);
this.sourcePartitionSchema = sourcePartitionSchema;
this.sourcePartition = sourcePartition;
this.sourceOffsetSchema = sourceOffsetSchema;
this.sourceOffset = sourceOffset;
}
public Schema sourcePartitionSchema() {
return sourcePartitionSchema;
}
public Object sourcePartition() {
public Map<String, ?> sourcePartition() {
return sourcePartition;
}
public Schema sourceOffsetSchema() {
return sourceOffsetSchema;
}
public Object sourceOffset() {
public Map<String, ?> sourceOffset() {
return sourceOffset;
}
@ -96,10 +83,6 @@ public class SourceRecord extends CopycatRecord {
SourceRecord that = (SourceRecord) o;
if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null)
return false;
if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null)
return false;
if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
return false;
if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
@ -111,8 +94,6 @@ public class SourceRecord extends CopycatRecord {
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0);
result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0);
result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
return result;

View File

@ -18,15 +18,20 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.data.SchemaAndValue;
import java.util.Collection;
import java.util.Map;
/**
* <p>
* OffsetStorageReader provides access to the offset storage used by sources. This can be used by
* connectors to determine offsets to start consuming data from. This is most commonly used during
* initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
* </p>
* <p>
* Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by
* {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct.
* </p>
*/
@InterfaceStability.Unstable
public interface OffsetStorageReader {
@ -37,12 +42,12 @@ public interface OffsetStorageReader {
* @param partition object uniquely identifying the partition of data
* @return object uniquely identifying the offset in the partition of data
*/
SchemaAndValue offset(SchemaAndValue partition);
<T> Map<String, Object> offset(Map<String, T> partition);
/**
* <p>
* Get a set of offsets for the specified partition identifiers. This may be more efficient
* than calling {@link #offset(SchemaAndValue)} repeatedly.
* than calling {@link #offset(Map)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
@ -56,5 +61,5 @@ public interface OffsetStorageReader {
* @param partitions set of identifiers for partitions of data
* @return a map of partition identifiers to decoded offsets
*/
Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions);
<T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions);
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
@ -26,17 +25,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.*;
/**
* FileStreamSourceTask reads from stdin or a file.
*/
public class FileStreamSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
private static final Schema OFFSET_KEY_SCHEMA = Schema.STRING_SCHEMA;
private static final Schema OFFSET_VALUE_SCHEMA = Schema.OPTIONAL_INT64_SCHEMA;
public static final String FILENAME_FIELD = "filename";
public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
private String filename;
@ -66,14 +63,14 @@ public class FileStreamSourceTask extends SourceTask {
if (stream == null) {
try {
stream = new FileInputStream(filename);
SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename));
if (offsetWithSchema != null) {
if (!offsetWithSchema.schema().equals(OFFSET_VALUE_SCHEMA))
throw new CopycatException("Unexpected offset schema.");
Long lastRecordedOffset = (Long) offsetWithSchema.value();
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Object lastRecordedOffset = offset.get(POSITION_FIELD);
if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
throw new CopycatException("Offset position is the incorrect type");
if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = lastRecordedOffset;
long skipLeft = (Long) lastRecordedOffset;
while (skipLeft > 0) {
try {
long skipped = stream.skip(skipLeft);
@ -85,7 +82,7 @@ public class FileStreamSourceTask extends SourceTask {
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
} else {
streamOffset = 0L;
}
@ -130,7 +127,7 @@ public class FileStreamSourceTask extends SourceTask {
if (line != null) {
if (records == null)
records = new ArrayList<>();
records.add(new SourceRecord(OFFSET_KEY_SCHEMA, filename, OFFSET_VALUE_SCHEMA, streamOffset, topic, VALUE_SCHEMA, line));
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
}
new ArrayList<SourceRecord>();
} while (line != null);
@ -193,4 +190,12 @@ public class FileStreamSourceTask extends SourceTask {
this.notify();
}
}
private Map<String, String> offsetKey(String filename) {
return Collections.singletonMap(FILENAME_FIELD, filename);
}
private Map<String, Long> offsetValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTaskContext;
@ -31,7 +30,9 @@ import org.powermock.api.easymock.PowerMock;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -89,7 +90,8 @@ public class FileStreamSourceTaskTest {
assertEquals(1, records.size());
assertEquals(TOPIC, records.get(0).topic());
assertEquals("partial line finished", records.get(0).value());
assertEquals(22L, records.get(0).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
assertEquals(null, task.poll());
// Different line endings, and make sure the final \r doesn't result in a line until we can
@ -99,20 +101,25 @@ public class FileStreamSourceTaskTest {
records = task.poll();
assertEquals(4, records.size());
assertEquals("line1", records.get(0).value());
assertEquals(28L, records.get(0).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
assertEquals("line2", records.get(1).value());
assertEquals(35L, records.get(1).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
assertEquals("line3", records.get(2).value());
assertEquals(41L, records.get(2).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
assertEquals("line4", records.get(3).value());
assertEquals(47L, records.get(3).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
os.write("subsequent text".getBytes());
os.flush();
records = task.poll();
assertEquals(1, records.size());
assertEquals("", records.get(0).value());
assertEquals(48L, records.get(0).sourceOffset());
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
task.stop();
}
@ -135,6 +142,6 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() {
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(SchemaAndValue.class))).andReturn(null);
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
}
}

View File

@ -32,8 +32,7 @@ import java.util.Properties;
public class WorkerConfig extends AbstractConfig {
public static final String CLUSTER_CONFIG = "cluster";
private static final String
CLUSTER_CONFIG_DOC =
private static final String CLUSTER_CONFIG_DOC =
"ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
+ "or instances may co-exist while sharing a single Kafka cluster.";
public static final String CLUSTER_DEFAULT = "copycat";
@ -58,6 +57,14 @@ public class WorkerConfig extends AbstractConfig {
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter";
public static final String OFFSET_KEY_CONVERTER_CLASS_DOC =
"Converter class for offset key Copycat data that implements the <code>Converter</code> interface.";
public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter";
public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC =
"Converter class for offset value Copycat data that implements the <code>Converter</code> interface.";
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
@ -88,6 +95,10 @@ public class WorkerConfig extends AbstractConfig {
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC)
.define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)

View File

@ -52,6 +52,8 @@ public class Worker {
private WorkerConfig config;
private Converter keyConverter;
private Converter valueConverter;
private Converter offsetKeyConverter;
private Converter offsetValueConverter;
private OffsetBackingStore offsetBackingStore;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
@ -66,9 +68,13 @@ public class Worker {
this.time = time;
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.keyConverter.configure(config.originals(), true);
this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter.configure(config.originals(), false);
this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true);
this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
if (offsetBackingStore != null) {
this.offsetBackingStore = offsetBackingStore;
@ -155,9 +161,9 @@ public class Worker {
if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
keyConverter, valueConverter);
offsetKeyConverter, offsetValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
keyConverter, valueConverter);
offsetKeyConverter, offsetValueConverter);
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {

View File

@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@ -155,8 +154,7 @@ class WorkerSourceTask implements WorkerTask {
}
});
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.offset(new SchemaAndValue(record.sourcePartitionSchema(), record.sourcePartition()),
new SchemaAndValue(record.sourceOffsetSchema(), record.sourceOffset()));
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
}
}

View File

@ -50,17 +50,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
@Override
public SchemaAndValue offset(SchemaAndValue partition) {
public <T> Map<String, Object> offset(Map<String, T> partition) {
return offsets(Arrays.asList(partition)).get(partition);
}
@Override
public Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions) {
public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, SchemaAndValue> serializedToOriginal = new HashMap<>(partitions.size());
for (SchemaAndValue key : partitions) {
Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());
for (Map<String, T> key : partitions) {
try {
byte[] keySerialized = keyConverter.fromCopycatData(namespace, key.schema(), key.value());
// Offsets are treated as schemaless, their format is only validated here (and the returned value below)
OffsetUtils.validateFormat(key);
byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, key);
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
@ -80,7 +82,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
// Deserialize all the values and map back to the original keys
Map<SchemaAndValue, SchemaAndValue> result = new HashMap<>(partitions.size());
Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try {
// Since null could be a valid key, explicitly check whether map contains the key
@ -89,10 +91,12 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
+ "store may have returned invalid data", rawEntry.getKey());
continue;
}
SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey());
SchemaAndValue deserializedValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
Object deserializedValue = deserializedSchemaAndValue.value();
OffsetUtils.validateFormat(deserializedValue);
result.put(origKey, deserializedValue);
result.put(origKey, (Map<String, Object>) deserializedValue);
} catch (Throwable t) {
log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with"
+ " namespace {}. No value for this data will be returned, which may break the "

View File

@ -17,15 +17,13 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Future;
/**
@ -71,10 +69,10 @@ public class OffsetStorageWriter {
private final Converter valueConverter;
private final String namespace;
// Offset data in Copycat format
private Map<SchemaAndValue, SchemaAndValue> data = new HashMap<>();
private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread
private Map<SchemaAndValue, SchemaAndValue> toFlush = null;
private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
// Unique ID for each flush request to handle callbacks after timeouts
private long currentFlushId = 0;
@ -91,8 +89,8 @@ public class OffsetStorageWriter {
* @param partition the partition to store an offset for
* @param offset the offset
*/
public synchronized void offset(SchemaAndValue partition, SchemaAndValue offset) {
data.put(partition, offset);
public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
}
private boolean flushing() {
@ -136,10 +134,14 @@ public class OffsetStorageWriter {
Map<ByteBuffer, ByteBuffer> offsetsSerialized;
try {
offsetsSerialized = new HashMap<>();
for (Map.Entry<SchemaAndValue, SchemaAndValue> entry : toFlush.entrySet()) {
byte[] key = keyConverter.fromCopycatData(namespace, entry.getKey().schema(), entry.getKey().value());
for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
// Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
// for that data. The only enforcement of the format is here.
OffsetUtils.validateFormat(entry.getKey());
OffsetUtils.validateFormat(entry.getValue());
byte[] key = keyConverter.fromCopycatData(namespace, null, entry.getKey());
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
byte[] value = valueConverter.fromCopycatData(namespace, entry.getValue().schema(), entry.getValue().value());
byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue());
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer);
}
@ -149,6 +151,7 @@ public class OffsetStorageWriter {
log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+ "offsets under namespace {}. This likely won't recover unless the "
+ "unserializable partition or offset information is overwritten.", namespace);
log.error("Cause of serialization failure:", t);
callback.onCompletion(t, null);
return null;
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.CopycatSchema;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.errors.DataException;
import java.util.Map;
public class OffsetUtils {
public static void validateFormat(Object offsetData) {
if (!(offsetData instanceof Map))
throw new DataException("Offsets must be specified as a Map");
validateFormat((Map<Object, Object>) offsetData);
}
public static <K, V> void validateFormat(Map<K, V> offsetData) {
for (Map.Entry<K, V> entry : offsetData.entrySet()) {
if (!(entry.getKey() instanceof String))
throw new DataException("Offsets may only use String keys");
Object value = entry.getValue();
if (value == null)
continue;
Schema.Type schemaType = CopycatSchema.schemaType(value.getClass());
if (!schemaType.isPrimitive())
throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType);
}
}
}

View File

@ -84,6 +84,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.key.converter.schemas.enable", "false");
workerProps.setProperty("offset.value.converter.schemas.enable", "false");
workerConfig = new WorkerConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

View File

@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@ -45,10 +44,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -59,10 +55,8 @@ import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";
private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA;
private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA;
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
// Copycat-format data
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
@ -89,7 +83,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final Properties EMPTY_TASK_PROPS = new Properties();
private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
);
@Override
@ -98,10 +92,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.key.converter.schemas.enable", "false");
workerProps.setProperty("offset.value.converter.schemas.enable", "false");
config = new WorkerConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
@ -201,7 +195,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
@ -255,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
});
// 2. Offset data is passed to the offset storage.
offsetWriter.offset(new SchemaAndValue(PARTITION_SCHEMA, PARTITION_BYTES), new SchemaAndValue(OFFSET_SCHEMA, OFFSET_BYTES));
offsetWriter.offset(PARTITION, OFFSET);
PowerMock.expectLastCall().anyTimes();
return sent;

View File

@ -55,6 +55,10 @@ public class WorkerTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("offset.key.converter.schemas.enable", "false");
workerProps.setProperty("offset.value.converter.schemas.enable", "false");
WorkerConfig config = new WorkerConfig(workerProps);
worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();

View File

@ -17,9 +17,6 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.easymock.Capture;
@ -34,9 +31,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ -47,10 +42,9 @@ import static org.junit.Assert.assertTrue;
public class OffsetStorageWriterTest {
private static final String NAMESPACE = "namespace";
// Copycat format - any types should be accepted here
private static final Schema OFFSET_KEY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA;
private static final String OFFSET_VALUE = "value";
private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key");
private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12);
// Serialized
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
@ -86,7 +80,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
@ -122,7 +116,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
assertTrue(writer.beginFlush());
@ -142,7 +136,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback);
assertTrue(writer.beginFlush()); // should throw
@ -154,7 +148,7 @@ public class OffsetStorageWriterTest {
public void testCancelBeforeAwaitFlush() {
PowerMock.replayAll();
writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.cancelFlush();
@ -172,7 +166,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
// Start the flush, then immediately cancel before allowing the mocked store request to finish
Future<Void> flushFuture = writer.doFlush(callback);
@ -201,8 +195,8 @@ public class OffsetStorageWriterTest {
private void expectStore(final Callback<Void> callback,
final boolean fail,
final CountDownLatch waitForCompletion) {
EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),

View File

@ -15,8 +15,10 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.copycat import CopycatStandaloneService
from kafkatest.services.console_consumer import ConsoleConsumer
from ducktape.utils.util import wait_until
import hashlib, subprocess
from ducktape.mark import parametrize
import hashlib, subprocess, json
class CopycatStandaloneFileTest(KafkaTest):
"""
@ -30,8 +32,14 @@ class CopycatStandaloneFileTest(KafkaTest):
OFFSETS_FILE = "/mnt/copycat.offsets"
FIRST_INPUT = "foo\nbar\nbaz\n"
SECOND_INPUT = "razz\nma\ntazz\n"
TOPIC = "test"
FIRST_INPUT_LIST = ["foo", "bar", "baz"]
FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
SCHEMA = { "type": "string", "optional": False }
def __init__(self, test_context):
super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
@ -40,8 +48,18 @@ class CopycatStandaloneFileTest(KafkaTest):
self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
@parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True)
@parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False)
@parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None)
def test_file_source_and_sink(self, converter="org.apache.kafka.json.JsonConverter", schemas=True):
assert converter != None, "converter type must be set"
# Template parameters
self.key_converter = converter
self.value_converter = converter
self.schemas = schemas
def test_file_source_and_sink(self):
# These need to be set
self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties"))
self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties"))
@ -61,6 +79,13 @@ class CopycatStandaloneFileTest(KafkaTest):
self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
# Validate the format of the data in the Kafka topic
self.consumer_validator.run()
expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
decoder = (json.loads if converter.endswith("JsonConverter") else str)
actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
def validate_output(self, value):
try:
output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]

View File

@ -17,4 +17,4 @@ name=local-file-sink
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
tasks.max=1
file={{ OUTPUT_FILE }}
topics=test
topics={{ TOPIC }}

View File

@ -17,4 +17,4 @@ name=local-file-source
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
tasks.max=1
file={{ INPUT_FILE }}
topic=test
topic={{ TOPIC }}

View File

@ -15,7 +15,18 @@
bootstrap.servers={{ kafka.bootstrap_servers() }}
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
{% endif %}
{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
{% endif %}
offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
offset.key.converter.schemas.enable=false
offset.value.converter.schemas.enable=false
offset.storage.file.filename={{ OFFSETS_FILE }}