mirror of https://github.com/apache/kafka.git
Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
This commit is contained in:
parent
e849e10c7e
commit
5a618c6815
|
@ -145,6 +145,7 @@
|
||||||
|
|
||||||
<subpackage name="storage">
|
<subpackage name="storage">
|
||||||
<allow pkg="org.apache.kafka.copycat" />
|
<allow pkg="org.apache.kafka.copycat" />
|
||||||
|
<allow pkg="org.apache.kafka.common.serialization" />
|
||||||
<!-- for tests -->
|
<!-- for tests -->
|
||||||
<allow pkg="org.easymock" />
|
<allow pkg="org.easymock" />
|
||||||
<allow pkg="org.powermock" />
|
<allow pkg="org.powermock" />
|
||||||
|
|
|
@ -16,6 +16,13 @@
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
schema.registry.url=http://localhost:8081
|
schema.registry.url=http://localhost:8081
|
||||||
|
|
||||||
|
# TODO: Non-avro built-ins?
|
||||||
|
converter=org.apache.kafka.copycat.avro.AvroConverter
|
||||||
|
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||||
|
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||||
|
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||||
|
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||||
|
|
||||||
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
||||||
offset.storage.file.filename=/tmp/copycat.offsets
|
offset.storage.file.filename=/tmp/copycat.offsets
|
||||||
# Flush much faster than normal, which is useful for testing/debugging
|
# Flush much faster than normal, which is useful for testing/debugging
|
||||||
|
|
|
@ -15,4 +15,11 @@
|
||||||
|
|
||||||
# These are defaults. This file just demonstrates how to override some settings.
|
# These are defaults. This file just demonstrates how to override some settings.
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
schema.registry.url=http://localhost:8081
|
schema.registry.url=http://localhost:8081
|
||||||
|
|
||||||
|
# TODO: Non-avro built-ins?
|
||||||
|
converter=org.apache.kafka.copycat.avro.AvroConverter
|
||||||
|
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||||
|
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||||
|
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||||
|
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
|
|
@ -1,58 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
|
||||||
|
|
||||||
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserializer for Copycat offsets.
|
|
||||||
* @param <T>
|
|
||||||
*/
|
|
||||||
public interface OffsetDeserializer<T> extends Closeable {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure this class.
|
|
||||||
* @param configs configs in key/value pairs
|
|
||||||
* @param isKey whether is for key or value
|
|
||||||
*/
|
|
||||||
public void configure(Map<String, ?> configs, boolean isKey);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserialize an offset key or value from the specified connector.
|
|
||||||
* @param connector connector associated with the data
|
|
||||||
* @param data serialized bytes
|
|
||||||
* @return deserialized typed data
|
|
||||||
*/
|
|
||||||
public T deserializeOffset(String connector, byte[] data);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deserialize an offset key or value from the specified connector using a schema.
|
|
||||||
* @param connector connector associated with the data
|
|
||||||
* @param data serialized bytes
|
|
||||||
* @param schema schema to deserialize to
|
|
||||||
* @return deserialized typed data
|
|
||||||
*/
|
|
||||||
public T deserializeOffset(String connector, byte[] data, Schema schema);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close();
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Serializer for Copycat offsets.
|
|
||||||
* @param <T> native type of offsets.
|
|
||||||
*/
|
|
||||||
public interface OffsetSerializer<T> extends Closeable {
|
|
||||||
/**
|
|
||||||
* Configure this class.
|
|
||||||
* @param configs configs in key/value pairs
|
|
||||||
* @param isKey whether is for key or value
|
|
||||||
*/
|
|
||||||
public void configure(Map<String, ?> configs, boolean isKey);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param connector the connector associated with offsets
|
|
||||||
* @param data typed data
|
|
||||||
* @return serialized bytes
|
|
||||||
*/
|
|
||||||
public byte[] serializeOffset(String connector, T data);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close this serializer.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void close();
|
|
||||||
|
|
||||||
}
|
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
package org.apache.kafka.copycat.storage;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -33,15 +31,14 @@ public interface OffsetStorageReader {
|
||||||
* gets it from the backing store, which may require some network round trips.
|
* gets it from the backing store, which may require some network round trips.
|
||||||
*
|
*
|
||||||
* @param stream object uniquely identifying the stream of data
|
* @param stream object uniquely identifying the stream of data
|
||||||
* @param schema schema used to decode the offset value
|
|
||||||
* @return object uniquely identifying the offset in the stream of data
|
* @return object uniquely identifying the offset in the stream of data
|
||||||
*/
|
*/
|
||||||
public Object getOffset(Object stream, Schema schema);
|
public Object getOffset(Object stream);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Get a set of offsets for the specified stream identifiers. This may be more efficient
|
* Get a set of offsets for the specified stream identifiers. This may be more efficient
|
||||||
* than calling {@link #getOffset(Object, Schema)} repeatedly.
|
* than calling {@link #getOffset(Object)} repeatedly.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* Note that when errors occur, this method omits the associated data and tries to return as
|
* Note that when errors occur, this method omits the associated data and tries to return as
|
||||||
|
@ -53,8 +50,7 @@ public interface OffsetStorageReader {
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @param streams set of identifiers for streams of data
|
* @param streams set of identifiers for streams of data
|
||||||
* @param schema schema used to decode offset values
|
|
||||||
* @return a map of stream identifiers to decoded offsets
|
* @return a map of stream identifiers to decoded offsets
|
||||||
*/
|
*/
|
||||||
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema);
|
public Map<Object, Object> getOffsets(Collection<Object> streams);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
* <p/>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p/>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.avro;
|
|
||||||
|
|
||||||
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
|
|
||||||
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
|
||||||
import org.apache.kafka.copycat.storage.OffsetDeserializer;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class AvroDeserializer extends AbstractKafkaAvroDeserializer implements OffsetDeserializer<Object> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
|
||||||
configure(new KafkaAvroDeserializerConfig(configs));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object deserializeOffset(String connector, byte[] data) {
|
|
||||||
// TODO: Support schema projection
|
|
||||||
return deserialize(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object deserializeOffset(String connector, byte[] data, Schema schema) {
|
|
||||||
// TODO: Support schema projection
|
|
||||||
return deserialize(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
* <p/>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p/>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.avro;
|
|
||||||
|
|
||||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
|
||||||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
|
|
||||||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
|
||||||
import org.apache.kafka.copycat.storage.OffsetSerializer;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class AvroSerializer extends AbstractKafkaAvroSerializer implements OffsetSerializer<Object> {
|
|
||||||
|
|
||||||
private boolean isKey;
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
|
||||||
this.isKey = isKey;
|
|
||||||
Object url = configs.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
|
|
||||||
if (url == null) {
|
|
||||||
throw new CopycatRuntimeException("Missing Schema registry url!");
|
|
||||||
}
|
|
||||||
Object maxSchemaObject = configs.get(
|
|
||||||
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
|
|
||||||
if (maxSchemaObject == null) {
|
|
||||||
schemaRegistry = new CachedSchemaRegistryClient(
|
|
||||||
(String) url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
|
|
||||||
} else {
|
|
||||||
schemaRegistry = new CachedSchemaRegistryClient(
|
|
||||||
(String) url, (Integer) maxSchemaObject);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] serializeOffset(String connector, Object data) {
|
|
||||||
String subject;
|
|
||||||
if (isKey) {
|
|
||||||
subject = connector + "-key";
|
|
||||||
} else {
|
|
||||||
subject = connector + "-value";
|
|
||||||
}
|
|
||||||
return serializeImpl(subject, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.file;
|
package org.apache.kafka.copycat.file;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
|
||||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.source.SourceRecord;
|
import org.apache.kafka.copycat.source.SourceRecord;
|
||||||
|
@ -55,9 +53,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
stream = new FileInputStream(filename);
|
stream = new FileInputStream(filename);
|
||||||
Schema longSchema = SchemaBuilder.builder().longType();
|
Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
|
||||||
Long lastRecordedOffset
|
|
||||||
= (Long) context.getOffsetStorageReader().getOffset(null, longSchema);
|
|
||||||
if (lastRecordedOffset != null) {
|
if (lastRecordedOffset != null) {
|
||||||
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
|
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
|
||||||
long skipLeft = lastRecordedOffset;
|
long skipLeft = lastRecordedOffset;
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.file;
|
package org.apache.kafka.copycat.file;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.source.SourceRecord;
|
import org.apache.kafka.copycat.source.SourceRecord;
|
||||||
|
@ -137,8 +136,7 @@ public class FileStreamSourceTaskTest {
|
||||||
|
|
||||||
private void expectOffsetLookupReturnNone() {
|
private void expectOffsetLookupReturnNone() {
|
||||||
EasyMock.expect(
|
EasyMock.expect(
|
||||||
offsetStorageReader.getOffset(
|
offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
|
||||||
EasyMock.anyObject(Object.class), EasyMock.anyObject(Schema.class)))
|
|
||||||
.andReturn(null);
|
.andReturn(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -52,58 +52,22 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
public static final String CONVERTER_CLASS_CONFIG = "converter";
|
public static final String CONVERTER_CLASS_CONFIG = "converter";
|
||||||
public static final String CONVERTER_CLASS_DOC =
|
public static final String CONVERTER_CLASS_DOC =
|
||||||
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
|
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
|
||||||
public static final String CONVERTER_CLASS_DEFAULT
|
|
||||||
= "org.apache.kafka.copycat.avro.AvroConverter"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
|
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
|
||||||
public static final String KEY_SERIALIZER_CLASS_DOC =
|
public static final String KEY_SERIALIZER_CLASS_DOC =
|
||||||
"Serializer class for key that implements the <code>Serializer</code> interface.";
|
"Serializer class for key that implements the <code>Serializer</code> interface.";
|
||||||
public static final String KEY_SERIALIZER_CLASS_DEFAULT
|
|
||||||
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
|
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
|
||||||
public static final String VALUE_SERIALIZER_CLASS_DOC =
|
public static final String VALUE_SERIALIZER_CLASS_DOC =
|
||||||
"Serializer class for value that implements the <code>Serializer</code> interface.";
|
"Serializer class for value that implements the <code>Serializer</code> interface.";
|
||||||
public static final String VALUE_SERIALIZER_CLASS_DEFAULT
|
|
||||||
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_CONFIG = "offset.key.serializer";
|
|
||||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_DOC =
|
|
||||||
"Serializer class for key that implements the <code>OffsetSerializer</code> interface.";
|
|
||||||
public static final String OFFSET_KEY_SERIALIZER_CLASS_DEFAULT
|
|
||||||
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_CONFIG = "offset.value.serializer";
|
|
||||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DOC =
|
|
||||||
"Serializer class for value that implements the <code>OffsetSerializer</code> interface.";
|
|
||||||
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT
|
|
||||||
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
|
|
||||||
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
|
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
|
||||||
public static final String KEY_DESERIALIZER_CLASS_DOC =
|
public static final String KEY_DESERIALIZER_CLASS_DOC =
|
||||||
"Serializer class for key that implements the <code>Deserializer</code> interface.";
|
"Serializer class for key that implements the <code>Deserializer</code> interface.";
|
||||||
public static final String KEY_DESERIALIZER_CLASS_DEFAULT
|
|
||||||
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
|
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
|
||||||
public static final String VALUE_DESERIALIZER_CLASS_DOC =
|
public static final String VALUE_DESERIALIZER_CLASS_DOC =
|
||||||
"Deserializer class for value that implements the <code>Deserializer</code> interface.";
|
"Deserializer class for value that implements the <code>Deserializer</code> interface.";
|
||||||
public static final String VALUE_DESERIALIZER_CLASS_DEFAULT
|
|
||||||
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_CONFIG = "offset.key.deserializer";
|
|
||||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DOC =
|
|
||||||
"Deserializer class for key that implements the <code>OffsetDeserializer</code> interface.";
|
|
||||||
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT
|
|
||||||
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG = "offset.value.deserializer";
|
|
||||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DOC =
|
|
||||||
"Deserializer class for value that implements the <code>OffsetDeserializer</code> interface.";
|
|
||||||
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT
|
|
||||||
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
|
|
||||||
|
|
||||||
|
|
||||||
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
|
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
|
||||||
= "task.shutdown.graceful.timeout.ms";
|
= "task.shutdown.graceful.timeout.ms";
|
||||||
|
@ -137,27 +101,16 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||||
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
||||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
|
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||||
Importance.HIGH, CONVERTER_CLASS_DOC)
|
Importance.HIGH, CONVERTER_CLASS_DOC)
|
||||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_SERIALIZER_CLASS_DEFAULT,
|
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||||
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
||||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_SERIALIZER_CLASS_DEFAULT,
|
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||||
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
|
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
|
||||||
.define(OFFSET_KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, OFFSET_KEY_SERIALIZER_CLASS_DEFAULT,
|
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||||
Importance.HIGH, OFFSET_KEY_SERIALIZER_CLASS_DOC)
|
|
||||||
.define(OFFSET_VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
|
||||||
OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT,
|
|
||||||
Importance.HIGH, OFFSET_VALUE_SERIALIZER_CLASS_DOC)
|
|
||||||
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_DESERIALIZER_CLASS_DEFAULT,
|
|
||||||
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
|
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
|
||||||
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_DESERIALIZER_CLASS_DEFAULT,
|
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||||
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
|
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
|
||||||
.define(OFFSET_KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
|
||||||
OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT,
|
|
||||||
Importance.HIGH, OFFSET_KEY_DESERIALIZER_CLASS_DOC)
|
|
||||||
.define(OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
|
||||||
OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT,
|
|
||||||
Importance.HIGH, OFFSET_VALUE_DESERIALIZER_CLASS_DOC)
|
|
||||||
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
|
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
|
||||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
|
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
|
||||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
|
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime;
|
package org.apache.kafka.copycat.runtime;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.SystemTime;
|
import org.apache.kafka.common.utils.SystemTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
@ -54,10 +56,10 @@ public class Worker {
|
||||||
private WorkerConfig config;
|
private WorkerConfig config;
|
||||||
private Converter converter;
|
private Converter converter;
|
||||||
private OffsetBackingStore offsetBackingStore;
|
private OffsetBackingStore offsetBackingStore;
|
||||||
private OffsetSerializer offsetKeySerializer;
|
private Serializer offsetKeySerializer;
|
||||||
private OffsetSerializer offsetValueSerializer;
|
private Serializer offsetValueSerializer;
|
||||||
private OffsetDeserializer offsetKeyDeserializer;
|
private Deserializer offsetKeyDeserializer;
|
||||||
private OffsetDeserializer offsetValueDeserializer;
|
private Deserializer offsetValueDeserializer;
|
||||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
||||||
private KafkaProducer producer;
|
private KafkaProducer producer;
|
||||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||||
|
@ -71,8 +73,8 @@ public class Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||||
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
|
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
||||||
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
|
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
||||||
|
@ -83,8 +85,7 @@ public class Worker {
|
||||||
this.offsetKeySerializer = offsetKeySerializer;
|
this.offsetKeySerializer = offsetKeySerializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetKeySerializer = Reflection.instantiate(
|
this.offsetKeySerializer = Reflection.instantiate(
|
||||||
config.getClass(WorkerConfig.OFFSET_KEY_SERIALIZER_CLASS_CONFIG).getName(),
|
config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
|
||||||
OffsetSerializer.class);
|
|
||||||
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
|
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,8 +93,7 @@ public class Worker {
|
||||||
this.offsetValueSerializer = offsetValueSerializer;
|
this.offsetValueSerializer = offsetValueSerializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetValueSerializer = Reflection.instantiate(
|
this.offsetValueSerializer = Reflection.instantiate(
|
||||||
config.getClass(WorkerConfig.OFFSET_VALUE_SERIALIZER_CLASS_CONFIG).getName(),
|
config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
|
||||||
OffsetSerializer.class);
|
|
||||||
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
|
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,8 +101,7 @@ public class Worker {
|
||||||
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetKeyDeserializer = Reflection.instantiate(
|
this.offsetKeyDeserializer = Reflection.instantiate(
|
||||||
config.getClass(WorkerConfig.OFFSET_KEY_DESERIALIZER_CLASS_CONFIG).getName(),
|
config.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
|
||||||
OffsetDeserializer.class);
|
|
||||||
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
|
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,8 +109,7 @@ public class Worker {
|
||||||
this.offsetValueDeserializer = offsetValueDeserializer;
|
this.offsetValueDeserializer = offsetValueDeserializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetValueDeserializer = Reflection.instantiate(
|
this.offsetValueDeserializer = Reflection.instantiate(
|
||||||
config.getClass(WorkerConfig.OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG).getName(),
|
config.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
|
||||||
OffsetDeserializer.class);
|
|
||||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,8 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
package org.apache.kafka.copycat.storage;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -39,13 +40,13 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||||
private final OffsetBackingStore backingStore;
|
private final OffsetBackingStore backingStore;
|
||||||
private final String namespace;
|
private final String namespace;
|
||||||
private final Converter converter;
|
private final Converter converter;
|
||||||
private final OffsetSerializer keySerializer;
|
private final Serializer keySerializer;
|
||||||
private final OffsetDeserializer valueDeserializer;
|
private final Deserializer valueDeserializer;
|
||||||
|
|
||||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||||
Converter converter,
|
Converter converter,
|
||||||
OffsetSerializer keySerializer,
|
Serializer keySerializer,
|
||||||
OffsetDeserializer valueDeserializer) {
|
Deserializer valueDeserializer) {
|
||||||
this.backingStore = backingStore;
|
this.backingStore = backingStore;
|
||||||
this.namespace = namespace;
|
this.namespace = namespace;
|
||||||
this.converter = converter;
|
this.converter = converter;
|
||||||
|
@ -54,18 +55,17 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getOffset(Object stream, Schema schema) {
|
public Object getOffset(Object stream) {
|
||||||
return getOffsets(Arrays.asList(stream), schema).get(stream);
|
return getOffsets(Arrays.asList(stream)).get(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema) {
|
public Map<Object, Object> getOffsets(Collection<Object> streams) {
|
||||||
// Serialize keys so backing store can work with them
|
// Serialize keys so backing store can work with them
|
||||||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
|
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
|
||||||
for (Object key : streams) {
|
for (Object key : streams) {
|
||||||
try {
|
try {
|
||||||
byte[] keySerialized = keySerializer.serializeOffset(namespace,
|
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
||||||
converter.fromCopycatData(key));
|
|
||||||
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
||||||
serializedToOriginal.put(keyBuffer, key);
|
serializedToOriginal.put(keyBuffer, key);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -96,7 +96,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||||
}
|
}
|
||||||
Object origKey = serializedToOriginal.get(rawEntry.getKey());
|
Object origKey = serializedToOriginal.get(rawEntry.getKey());
|
||||||
Object deserializedValue = converter.toCopycatData(
|
Object deserializedValue = converter.toCopycatData(
|
||||||
valueDeserializer.deserializeOffset(namespace, rawEntry.getValue().array(), schema)
|
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
|
||||||
);
|
);
|
||||||
|
|
||||||
result.put(origKey, deserializedValue);
|
result.put(origKey, deserializedValue);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
package org.apache.kafka.copycat.storage;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -67,8 +68,8 @@ public class OffsetStorageWriter {
|
||||||
|
|
||||||
private final OffsetBackingStore backingStore;
|
private final OffsetBackingStore backingStore;
|
||||||
private final Converter converter;
|
private final Converter converter;
|
||||||
private final OffsetSerializer keySerializer;
|
private final Serializer keySerializer;
|
||||||
private final OffsetSerializer valueSerializer;
|
private final Serializer valueSerializer;
|
||||||
private final String namespace;
|
private final String namespace;
|
||||||
private Map<Object, Object> data = new HashMap<Object, Object>();
|
private Map<Object, Object> data = new HashMap<Object, Object>();
|
||||||
|
|
||||||
|
@ -79,7 +80,7 @@ public class OffsetStorageWriter {
|
||||||
|
|
||||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||||
String namespace, Converter converter,
|
String namespace, Converter converter,
|
||||||
OffsetSerializer keySerializer, OffsetSerializer valueSerializer) {
|
Serializer keySerializer, Serializer valueSerializer) {
|
||||||
this.backingStore = backingStore;
|
this.backingStore = backingStore;
|
||||||
this.namespace = namespace;
|
this.namespace = namespace;
|
||||||
this.converter = converter;
|
this.converter = converter;
|
||||||
|
@ -134,11 +135,9 @@ public class OffsetStorageWriter {
|
||||||
try {
|
try {
|
||||||
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
|
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
|
||||||
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
|
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
|
||||||
byte[] key = keySerializer.serializeOffset(namespace,
|
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
|
||||||
converter.fromCopycatData(entry.getKey()));
|
|
||||||
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
||||||
byte[] value = valueSerializer.serializeOffset(namespace,
|
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
|
||||||
converter.fromCopycatData(entry.getValue()));
|
|
||||||
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
|
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
|
||||||
offsetsSerialized.put(keyBuffer, valueBuffer);
|
offsetsSerialized.put(keyBuffer, valueBuffer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
super.setup();
|
super.setup();
|
||||||
time = new MockTime();
|
time = new MockTime();
|
||||||
sinkTask = PowerMock.createMock(SinkTask.class);
|
sinkTask = PowerMock.createMock(SinkTask.class);
|
||||||
workerConfig = new WorkerConfig();
|
Properties workerProps = new Properties();
|
||||||
|
// TODO: Non-avro built-ins?
|
||||||
|
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||||
|
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
|
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
|
workerConfig = new WorkerConfig(workerProps);
|
||||||
converter = PowerMock.createMock(Converter.class);
|
converter = PowerMock.createMock(Converter.class);
|
||||||
workerTask = PowerMock.createPartialMock(
|
workerTask = PowerMock.createPartialMock(
|
||||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
private static final String CONVERTED_RECORD = "converted-record";
|
private static final String CONVERTED_RECORD = "converted-record";
|
||||||
|
|
||||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||||
private WorkerConfig config = new WorkerConfig();
|
private WorkerConfig config;
|
||||||
private SourceTask sourceTask;
|
private SourceTask sourceTask;
|
||||||
private Converter converter;
|
private Converter converter;
|
||||||
private KafkaProducer<Object, Object> producer;
|
private KafkaProducer<Object, Object> producer;
|
||||||
|
@ -86,6 +86,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
@Override
|
@Override
|
||||||
public void setup() {
|
public void setup() {
|
||||||
super.setup();
|
super.setup();
|
||||||
|
Properties workerProps = new Properties();
|
||||||
|
// TODO: Non-avro built-ins?
|
||||||
|
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||||
|
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
|
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
|
config = new WorkerConfig(workerProps);
|
||||||
sourceTask = PowerMock.createMock(SourceTask.class);
|
sourceTask = PowerMock.createMock(SourceTask.class);
|
||||||
converter = PowerMock.createMock(Converter.class);
|
converter = PowerMock.createMock(Converter.class);
|
||||||
producer = PowerMock.createMock(KafkaProducer.class);
|
producer = PowerMock.createMock(KafkaProducer.class);
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime;
|
package org.apache.kafka.copycat.runtime;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||||
|
@ -48,18 +50,24 @@ public class WorkerTest extends ThreadedTest {
|
||||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||||
private Worker worker;
|
private Worker worker;
|
||||||
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
|
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
|
||||||
private OffsetSerializer offsetKeySerializer = PowerMock.createMock(OffsetSerializer.class);
|
private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
|
||||||
private OffsetSerializer offsetValueSerializer = PowerMock.createMock(OffsetSerializer.class);
|
private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
|
||||||
private OffsetDeserializer offsetKeyDeserializer = PowerMock.createMock(OffsetDeserializer.class);
|
private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
|
||||||
private OffsetDeserializer offsetValueDeserializer = PowerMock.createMock(OffsetDeserializer.class);
|
private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
super.setup();
|
super.setup();
|
||||||
|
|
||||||
// TODO: Remove schema registry URL
|
// TODO: Remove schema registry URL
|
||||||
|
// TODO: Non-avro built-ins?
|
||||||
Properties workerProps = new Properties();
|
Properties workerProps = new Properties();
|
||||||
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
||||||
|
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
|
||||||
|
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||||
|
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
|
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||||
WorkerConfig config = new WorkerConfig(workerProps);
|
WorkerConfig config = new WorkerConfig(workerProps);
|
||||||
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
||||||
offsetKeySerializer, offsetValueSerializer,
|
offsetKeySerializer, offsetValueSerializer,
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.storage;
|
package org.apache.kafka.copycat.storage;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.easymock.Capture;
|
import org.easymock.Capture;
|
||||||
|
@ -49,8 +50,8 @@ public class OffsetStorageWriterTest {
|
||||||
|
|
||||||
private OffsetBackingStore store;
|
private OffsetBackingStore store;
|
||||||
private Converter converter;
|
private Converter converter;
|
||||||
private OffsetSerializer keySerializer;
|
private Serializer keySerializer;
|
||||||
private OffsetSerializer valueSerializer;
|
private Serializer valueSerializer;
|
||||||
private OffsetStorageWriter writer;
|
private OffsetStorageWriter writer;
|
||||||
|
|
||||||
private static Exception exception = new RuntimeException("error");
|
private static Exception exception = new RuntimeException("error");
|
||||||
|
@ -61,8 +62,8 @@ public class OffsetStorageWriterTest {
|
||||||
public void setup() {
|
public void setup() {
|
||||||
store = PowerMock.createMock(OffsetBackingStore.class);
|
store = PowerMock.createMock(OffsetBackingStore.class);
|
||||||
converter = PowerMock.createMock(Converter.class);
|
converter = PowerMock.createMock(Converter.class);
|
||||||
keySerializer = PowerMock.createMock(OffsetSerializer.class);
|
keySerializer = PowerMock.createMock(Serializer.class);
|
||||||
valueSerializer = PowerMock.createMock(OffsetSerializer.class);
|
valueSerializer = PowerMock.createMock(Serializer.class);
|
||||||
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
|
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
|
||||||
|
|
||||||
service = Executors.newFixedThreadPool(1);
|
service = Executors.newFixedThreadPool(1);
|
||||||
|
@ -193,9 +194,9 @@ public class OffsetStorageWriterTest {
|
||||||
final boolean fail,
|
final boolean fail,
|
||||||
final CountDownLatch waitForCompletion) {
|
final CountDownLatch waitForCompletion) {
|
||||||
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
||||||
EasyMock.expect(keySerializer.serializeOffset(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||||
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
||||||
EasyMock.expect(valueSerializer.serializeOffset(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||||
|
|
||||||
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
|
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
|
||||||
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
|
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
|
||||||
|
|
Loading…
Reference in New Issue