mirror of https://github.com/apache/kafka.git
Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
This commit is contained in:
parent
e849e10c7e
commit
5a618c6815
|
@ -145,6 +145,7 @@
|
|||
|
||||
<subpackage name="storage">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
<allow pkg="org.apache.kafka.common.serialization" />
|
||||
<!-- for tests -->
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="org.powermock" />
|
||||
|
|
|
@ -16,6 +16,13 @@
|
|||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
||||
|
||||
# TODO: Non-avro built-ins?
|
||||
converter=org.apache.kafka.copycat.avro.AvroConverter
|
||||
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||
|
||||
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
||||
offset.storage.file.filename=/tmp/copycat.offsets
|
||||
# Flush much faster than normal, which is useful for testing/debugging
|
||||
|
|
|
@ -15,4 +15,11 @@
|
|||
|
||||
# These are defaults. This file just demonstrates how to override some settings.
|
||||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
||||
schema.registry.url=http://localhost:8081
|
||||
|
||||
# TODO: Non-avro built-ins?
|
||||
converter=org.apache.kafka.copycat.avro.AvroConverter
|
||||
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
|
@ -1,58 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Deserializer for Copycat offsets.
|
||||
* @param <T>
|
||||
*/
|
||||
public interface OffsetDeserializer<T> extends Closeable {
|
||||
|
||||
/**
|
||||
* Configure this class.
|
||||
* @param configs configs in key/value pairs
|
||||
* @param isKey whether is for key or value
|
||||
*/
|
||||
public void configure(Map<String, ?> configs, boolean isKey);
|
||||
|
||||
/**
|
||||
* Deserialize an offset key or value from the specified connector.
|
||||
* @param connector connector associated with the data
|
||||
* @param data serialized bytes
|
||||
* @return deserialized typed data
|
||||
*/
|
||||
public T deserializeOffset(String connector, byte[] data);
|
||||
|
||||
/**
|
||||
* Deserialize an offset key or value from the specified connector using a schema.
|
||||
* @param connector connector associated with the data
|
||||
* @param data serialized bytes
|
||||
* @param schema schema to deserialize to
|
||||
* @return deserialized typed data
|
||||
*/
|
||||
public T deserializeOffset(String connector, byte[] data, Schema schema);
|
||||
|
||||
@Override
|
||||
public void close();
|
||||
}
|
|
@ -1,48 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Serializer for Copycat offsets.
|
||||
* @param <T> native type of offsets.
|
||||
*/
|
||||
public interface OffsetSerializer<T> extends Closeable {
|
||||
/**
|
||||
* Configure this class.
|
||||
* @param configs configs in key/value pairs
|
||||
* @param isKey whether is for key or value
|
||||
*/
|
||||
public void configure(Map<String, ?> configs, boolean isKey);
|
||||
|
||||
/**
|
||||
* @param connector the connector associated with offsets
|
||||
* @param data typed data
|
||||
* @return serialized bytes
|
||||
*/
|
||||
public byte[] serializeOffset(String connector, T data);
|
||||
|
||||
/**
|
||||
* Close this serializer.
|
||||
*/
|
||||
@Override
|
||||
public void close();
|
||||
|
||||
}
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -33,15 +31,14 @@ public interface OffsetStorageReader {
|
|||
* gets it from the backing store, which may require some network round trips.
|
||||
*
|
||||
* @param stream object uniquely identifying the stream of data
|
||||
* @param schema schema used to decode the offset value
|
||||
* @return object uniquely identifying the offset in the stream of data
|
||||
*/
|
||||
public Object getOffset(Object stream, Schema schema);
|
||||
public Object getOffset(Object stream);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Get a set of offsets for the specified stream identifiers. This may be more efficient
|
||||
* than calling {@link #getOffset(Object, Schema)} repeatedly.
|
||||
* than calling {@link #getOffset(Object)} repeatedly.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note that when errors occur, this method omits the associated data and tries to return as
|
||||
|
@ -53,8 +50,7 @@ public interface OffsetStorageReader {
|
|||
* </p>
|
||||
*
|
||||
* @param streams set of identifiers for streams of data
|
||||
* @param schema schema used to decode offset values
|
||||
* @return a map of stream identifiers to decoded offsets
|
||||
*/
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema);
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams);
|
||||
}
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.avro;
|
||||
|
||||
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.storage.OffsetDeserializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class AvroDeserializer extends AbstractKafkaAvroDeserializer implements OffsetDeserializer<Object> {
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
configure(new KafkaAvroDeserializerConfig(configs));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserializeOffset(String connector, byte[] data) {
|
||||
// TODO: Support schema projection
|
||||
return deserialize(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserializeOffset(String connector, byte[] data, Schema schema) {
|
||||
// TODO: Support schema projection
|
||||
return deserialize(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
|
@ -1,66 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.avro;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.storage.OffsetSerializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class AvroSerializer extends AbstractKafkaAvroSerializer implements OffsetSerializer<Object> {
|
||||
|
||||
private boolean isKey;
|
||||
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
this.isKey = isKey;
|
||||
Object url = configs.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
|
||||
if (url == null) {
|
||||
throw new CopycatRuntimeException("Missing Schema registry url!");
|
||||
}
|
||||
Object maxSchemaObject = configs.get(
|
||||
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
|
||||
if (maxSchemaObject == null) {
|
||||
schemaRegistry = new CachedSchemaRegistryClient(
|
||||
(String) url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
|
||||
} else {
|
||||
schemaRegistry = new CachedSchemaRegistryClient(
|
||||
(String) url, (Integer) maxSchemaObject);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serializeOffset(String connector, Object data) {
|
||||
String subject;
|
||||
if (isKey) {
|
||||
subject = connector + "-key";
|
||||
} else {
|
||||
subject = connector + "-value";
|
||||
}
|
||||
return serializeImpl(subject, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
|
@ -55,9 +53,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
} else {
|
||||
try {
|
||||
stream = new FileInputStream(filename);
|
||||
Schema longSchema = SchemaBuilder.builder().longType();
|
||||
Long lastRecordedOffset
|
||||
= (Long) context.getOffsetStorageReader().getOffset(null, longSchema);
|
||||
Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
|
||||
if (lastRecordedOffset != null) {
|
||||
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
|
||||
long skipLeft = lastRecordedOffset;
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
|
@ -137,8 +136,7 @@ public class FileStreamSourceTaskTest {
|
|||
|
||||
private void expectOffsetLookupReturnNone() {
|
||||
EasyMock.expect(
|
||||
offsetStorageReader.getOffset(
|
||||
EasyMock.anyObject(Object.class), EasyMock.anyObject(Schema.class)))
|
||||
offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
|
||||
.andReturn(null);
|
||||
}
|
||||
}
|
|
@ -52,58 +52,22 @@ public class WorkerConfig extends AbstractConfig {
|
|||
public static final String CONVERTER_CLASS_CONFIG = "converter";
|
||||
public static final String CONVERTER_CLASS_DOC =
|
||||
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
|
||||
public static final String CONVERTER_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.avro.AvroConverter"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
|
||||
public static final String KEY_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for key that implements the <code>Serializer</code> interface.";
|
||||
public static final String KEY_SERIALIZER_CLASS_DEFAULT
|
||||
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
|
||||
public static final String VALUE_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for value that implements the <code>Serializer</code> interface.";
|
||||
public static final String VALUE_SERIALIZER_CLASS_DEFAULT
|
||||
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_CONFIG = "offset.key.serializer";
|
||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for key that implements the <code>OffsetSerializer</code> interface.";
|
||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_CONFIG = "offset.value.serializer";
|
||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for value that implements the <code>OffsetSerializer</code> interface.";
|
||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
|
||||
|
||||
|
||||
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
|
||||
public static final String KEY_DESERIALIZER_CLASS_DOC =
|
||||
"Serializer class for key that implements the <code>Deserializer</code> interface.";
|
||||
public static final String KEY_DESERIALIZER_CLASS_DEFAULT
|
||||
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
|
||||
public static final String VALUE_DESERIALIZER_CLASS_DOC =
|
||||
"Deserializer class for value that implements the <code>Deserializer</code> interface.";
|
||||
public static final String VALUE_DESERIALIZER_CLASS_DEFAULT
|
||||
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_CONFIG = "offset.key.deserializer";
|
||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DOC =
|
||||
"Deserializer class for key that implements the <code>OffsetDeserializer</code> interface.";
|
||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
|
||||
|
||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG = "offset.value.deserializer";
|
||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DOC =
|
||||
"Deserializer class for value that implements the <code>OffsetDeserializer</code> interface.";
|
||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
|
||||
|
||||
|
||||
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
|
||||
= "task.shutdown.graceful.timeout.ms";
|
||||
|
@ -137,27 +101,16 @@ public class WorkerConfig extends AbstractConfig {
|
|||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
|
||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, CONVERTER_CLASS_DOC)
|
||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_SERIALIZER_CLASS_DEFAULT,
|
||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_SERIALIZER_CLASS_DEFAULT,
|
||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
|
||||
.define(OFFSET_KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, OFFSET_KEY_SERIALIZER_CLASS_DEFAULT,
|
||||
Importance.HIGH, OFFSET_KEY_SERIALIZER_CLASS_DOC)
|
||||
.define(OFFSET_VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT,
|
||||
Importance.HIGH, OFFSET_VALUE_SERIALIZER_CLASS_DOC)
|
||||
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_DESERIALIZER_CLASS_DEFAULT,
|
||||
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_DESERIALIZER_CLASS_DEFAULT,
|
||||
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
|
||||
.define(OFFSET_KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT,
|
||||
Importance.HIGH, OFFSET_KEY_DESERIALIZER_CLASS_DOC)
|
||||
.define(OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT,
|
||||
Importance.HIGH, OFFSET_VALUE_DESERIALIZER_CLASS_DOC)
|
||||
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.common.utils.SystemTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -54,10 +56,10 @@ public class Worker {
|
|||
private WorkerConfig config;
|
||||
private Converter converter;
|
||||
private OffsetBackingStore offsetBackingStore;
|
||||
private OffsetSerializer offsetKeySerializer;
|
||||
private OffsetSerializer offsetValueSerializer;
|
||||
private OffsetDeserializer offsetKeyDeserializer;
|
||||
private OffsetDeserializer offsetValueDeserializer;
|
||||
private Serializer offsetKeySerializer;
|
||||
private Serializer offsetValueSerializer;
|
||||
private Deserializer offsetKeyDeserializer;
|
||||
private Deserializer offsetValueDeserializer;
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
||||
private KafkaProducer producer;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
@ -71,8 +73,8 @@ public class Worker {
|
|||
}
|
||||
|
||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
|
||||
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
|
||||
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
||||
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||
this.time = time;
|
||||
this.config = config;
|
||||
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
||||
|
@ -83,8 +85,7 @@ public class Worker {
|
|||
this.offsetKeySerializer = offsetKeySerializer;
|
||||
} else {
|
||||
this.offsetKeySerializer = Reflection.instantiate(
|
||||
config.getClass(WorkerConfig.OFFSET_KEY_SERIALIZER_CLASS_CONFIG).getName(),
|
||||
OffsetSerializer.class);
|
||||
config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
|
||||
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
|
||||
}
|
||||
|
||||
|
@ -92,8 +93,7 @@ public class Worker {
|
|||
this.offsetValueSerializer = offsetValueSerializer;
|
||||
} else {
|
||||
this.offsetValueSerializer = Reflection.instantiate(
|
||||
config.getClass(WorkerConfig.OFFSET_VALUE_SERIALIZER_CLASS_CONFIG).getName(),
|
||||
OffsetSerializer.class);
|
||||
config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
|
||||
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
|
||||
}
|
||||
|
||||
|
@ -101,8 +101,7 @@ public class Worker {
|
|||
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
||||
} else {
|
||||
this.offsetKeyDeserializer = Reflection.instantiate(
|
||||
config.getClass(WorkerConfig.OFFSET_KEY_DESERIALIZER_CLASS_CONFIG).getName(),
|
||||
OffsetDeserializer.class);
|
||||
config.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
|
||||
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
|
||||
}
|
||||
|
||||
|
@ -110,8 +109,7 @@ public class Worker {
|
|||
this.offsetValueDeserializer = offsetValueDeserializer;
|
||||
} else {
|
||||
this.offsetValueDeserializer = Reflection.instantiate(
|
||||
config.getClass(WorkerConfig.OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG).getName(),
|
||||
OffsetDeserializer.class);
|
||||
config.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
|
||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -39,13 +40,13 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
private final OffsetBackingStore backingStore;
|
||||
private final String namespace;
|
||||
private final Converter converter;
|
||||
private final OffsetSerializer keySerializer;
|
||||
private final OffsetDeserializer valueDeserializer;
|
||||
private final Serializer keySerializer;
|
||||
private final Deserializer valueDeserializer;
|
||||
|
||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||
Converter converter,
|
||||
OffsetSerializer keySerializer,
|
||||
OffsetDeserializer valueDeserializer) {
|
||||
Serializer keySerializer,
|
||||
Deserializer valueDeserializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.converter = converter;
|
||||
|
@ -54,18 +55,17 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Object getOffset(Object stream, Schema schema) {
|
||||
return getOffsets(Arrays.asList(stream), schema).get(stream);
|
||||
public Object getOffset(Object stream) {
|
||||
return getOffsets(Arrays.asList(stream)).get(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema) {
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams) {
|
||||
// Serialize keys so backing store can work with them
|
||||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
|
||||
for (Object key : streams) {
|
||||
try {
|
||||
byte[] keySerialized = keySerializer.serializeOffset(namespace,
|
||||
converter.fromCopycatData(key));
|
||||
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
||||
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
||||
serializedToOriginal.put(keyBuffer, key);
|
||||
} catch (Throwable t) {
|
||||
|
@ -96,7 +96,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
}
|
||||
Object origKey = serializedToOriginal.get(rawEntry.getKey());
|
||||
Object deserializedValue = converter.toCopycatData(
|
||||
valueDeserializer.deserializeOffset(namespace, rawEntry.getValue().array(), schema)
|
||||
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
|
||||
);
|
||||
|
||||
result.put(origKey, deserializedValue);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -67,8 +68,8 @@ public class OffsetStorageWriter {
|
|||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final Converter converter;
|
||||
private final OffsetSerializer keySerializer;
|
||||
private final OffsetSerializer valueSerializer;
|
||||
private final Serializer keySerializer;
|
||||
private final Serializer valueSerializer;
|
||||
private final String namespace;
|
||||
private Map<Object, Object> data = new HashMap<Object, Object>();
|
||||
|
||||
|
@ -79,7 +80,7 @@ public class OffsetStorageWriter {
|
|||
|
||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||
String namespace, Converter converter,
|
||||
OffsetSerializer keySerializer, OffsetSerializer valueSerializer) {
|
||||
Serializer keySerializer, Serializer valueSerializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.converter = converter;
|
||||
|
@ -134,11 +135,9 @@ public class OffsetStorageWriter {
|
|||
try {
|
||||
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
|
||||
byte[] key = keySerializer.serializeOffset(namespace,
|
||||
converter.fromCopycatData(entry.getKey()));
|
||||
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
|
||||
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
||||
byte[] value = valueSerializer.serializeOffset(namespace,
|
||||
converter.fromCopycatData(entry.getValue()));
|
||||
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
|
||||
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
|
||||
offsetsSerialized.put(keyBuffer, valueBuffer);
|
||||
}
|
||||
|
|
|
@ -76,7 +76,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
super.setup();
|
||||
time = new MockTime();
|
||||
sinkTask = PowerMock.createMock(SinkTask.class);
|
||||
workerConfig = new WorkerConfig();
|
||||
Properties workerProps = new Properties();
|
||||
// TODO: Non-avro built-ins?
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
workerConfig = new WorkerConfig(workerProps);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
workerTask = PowerMock.createPartialMock(
|
||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||
|
|
|
@ -66,7 +66,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
private static final String CONVERTED_RECORD = "converted-record";
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private WorkerConfig config = new WorkerConfig();
|
||||
private WorkerConfig config;
|
||||
private SourceTask sourceTask;
|
||||
private Converter converter;
|
||||
private KafkaProducer<Object, Object> producer;
|
||||
|
@ -86,6 +86,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
@Override
|
||||
public void setup() {
|
||||
super.setup();
|
||||
Properties workerProps = new Properties();
|
||||
// TODO: Non-avro built-ins?
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
config = new WorkerConfig(workerProps);
|
||||
sourceTask = PowerMock.createMock(SourceTask.class);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
producer = PowerMock.createMock(KafkaProducer.class);
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
|
@ -48,18 +50,24 @@ public class WorkerTest extends ThreadedTest {
|
|||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private Worker worker;
|
||||
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
|
||||
private OffsetSerializer offsetKeySerializer = PowerMock.createMock(OffsetSerializer.class);
|
||||
private OffsetSerializer offsetValueSerializer = PowerMock.createMock(OffsetSerializer.class);
|
||||
private OffsetDeserializer offsetKeyDeserializer = PowerMock.createMock(OffsetDeserializer.class);
|
||||
private OffsetDeserializer offsetValueDeserializer = PowerMock.createMock(OffsetDeserializer.class);
|
||||
private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
|
||||
private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
|
||||
private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
|
||||
private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
super.setup();
|
||||
|
||||
// TODO: Remove schema registry URL
|
||||
// TODO: Non-avro built-ins?
|
||||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
WorkerConfig config = new WorkerConfig(workerProps);
|
||||
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
||||
offsetKeySerializer, offsetValueSerializer,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.easymock.Capture;
|
||||
|
@ -49,8 +50,8 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
private OffsetBackingStore store;
|
||||
private Converter converter;
|
||||
private OffsetSerializer keySerializer;
|
||||
private OffsetSerializer valueSerializer;
|
||||
private Serializer keySerializer;
|
||||
private Serializer valueSerializer;
|
||||
private OffsetStorageWriter writer;
|
||||
|
||||
private static Exception exception = new RuntimeException("error");
|
||||
|
@ -61,8 +62,8 @@ public class OffsetStorageWriterTest {
|
|||
public void setup() {
|
||||
store = PowerMock.createMock(OffsetBackingStore.class);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
keySerializer = PowerMock.createMock(OffsetSerializer.class);
|
||||
valueSerializer = PowerMock.createMock(OffsetSerializer.class);
|
||||
keySerializer = PowerMock.createMock(Serializer.class);
|
||||
valueSerializer = PowerMock.createMock(Serializer.class);
|
||||
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
|
||||
|
||||
service = Executors.newFixedThreadPool(1);
|
||||
|
@ -193,9 +194,9 @@ public class OffsetStorageWriterTest {
|
|||
final boolean fail,
|
||||
final CountDownLatch waitForCompletion) {
|
||||
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
||||
EasyMock.expect(keySerializer.serializeOffset(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
||||
EasyMock.expect(valueSerializer.serializeOffset(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||
|
||||
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
|
||||
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
|
||||
|
|
Loading…
Reference in New Issue