mirror of https://github.com/apache/kafka.git
Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.
This commit is contained in:
parent
e3451427f2
commit
0b5a1a0c57
|
@ -23,7 +23,7 @@ package org.apache.kafka.copycat.connector;
|
||||||
public interface ConnectorContext {
|
public interface ConnectorContext {
|
||||||
/**
|
/**
|
||||||
* Requests that the runtime reconfigure the Tasks for this source. This should be used to
|
* Requests that the runtime reconfigure the Tasks for this source. This should be used to
|
||||||
* indicate to the runtime that something about the input/output has changed (e.g. streams
|
* indicate to the runtime that something about the input/output has changed (e.g. partitions
|
||||||
* added/removed) and the running Tasks will need to be modified.
|
* added/removed) and the running Tasks will need to be modified.
|
||||||
*/
|
*/
|
||||||
void requestTaskReconfiguration();
|
void requestTaskReconfiguration();
|
||||||
|
|
|
@ -21,23 +21,23 @@ package org.apache.kafka.copycat.connector;
|
||||||
* <p>
|
* <p>
|
||||||
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
|
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
|
||||||
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
|
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
|
||||||
* sources and sinks (topic, partition, key, value). Although both implementations include a
|
* sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a
|
||||||
* notion of offset, it is not included here because they differ in type.
|
* notion of offset, it is not included here because they differ in type.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public abstract class CopycatRecord {
|
public abstract class CopycatRecord {
|
||||||
private final String topic;
|
private final String topic;
|
||||||
private final Integer partition;
|
private final Integer kafkaPartition;
|
||||||
private final Object key;
|
private final Object key;
|
||||||
private final Object value;
|
private final Object value;
|
||||||
|
|
||||||
public CopycatRecord(String topic, Integer partition, Object value) {
|
public CopycatRecord(String topic, Integer kafkaPartition, Object value) {
|
||||||
this(topic, partition, null, value);
|
this(topic, kafkaPartition, null, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CopycatRecord(String topic, Integer partition, Object key, Object value) {
|
public CopycatRecord(String topic, Integer kafkaPartition, Object key, Object value) {
|
||||||
this.topic = topic;
|
this.topic = topic;
|
||||||
this.partition = partition;
|
this.kafkaPartition = kafkaPartition;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
@ -46,8 +46,8 @@ public abstract class CopycatRecord {
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getPartition() {
|
public Integer getKafkaPartition() {
|
||||||
return partition;
|
return kafkaPartition;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getKey() {
|
public Object getKey() {
|
||||||
|
@ -62,7 +62,7 @@ public abstract class CopycatRecord {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "CopycatRecord{" +
|
return "CopycatRecord{" +
|
||||||
"topic='" + topic + '\'' +
|
"topic='" + topic + '\'' +
|
||||||
", partition=" + partition +
|
", kafkaPartition=" + kafkaPartition +
|
||||||
", key=" + key +
|
", key=" + key +
|
||||||
", value=" + value +
|
", value=" + value +
|
||||||
'}';
|
'}';
|
||||||
|
@ -79,7 +79,7 @@ public abstract class CopycatRecord {
|
||||||
|
|
||||||
if (key != null ? !key.equals(that.key) : that.key != null)
|
if (key != null ? !key.equals(that.key) : that.key != null)
|
||||||
return false;
|
return false;
|
||||||
if (partition != null ? !partition.equals(that.partition) : that.partition != null)
|
if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
|
||||||
return false;
|
return false;
|
||||||
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
|
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
|
||||||
return false;
|
return false;
|
||||||
|
@ -92,7 +92,7 @@ public abstract class CopycatRecord {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int result = topic != null ? topic.hashCode() : 0;
|
int result = topic != null ? topic.hashCode() : 0;
|
||||||
result = 31 * result + (partition != null ? partition.hashCode() : 0);
|
result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0);
|
||||||
result = 31 * result + (key != null ? key.hashCode() : 0);
|
result = 31 * result + (key != null ? key.hashCode() : 0);
|
||||||
result = 31 * result + (value != null ? value.hashCode() : 0);
|
result = 31 * result + (value != null ? value.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -20,20 +20,20 @@ package org.apache.kafka.copycat.sink;
|
||||||
import org.apache.kafka.copycat.connector.CopycatRecord;
|
import org.apache.kafka.copycat.connector.CopycatRecord;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the offset of
|
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of
|
||||||
* the record in the Kafka topic-partition in addition to the standard fields. This information
|
* the record in the Kafka topic-partition in addition to the standard fields. This information
|
||||||
* should be used by the SinkTask to coordinate offset commits.
|
* should be used by the SinkTask to coordinate kafkaOffset commits.
|
||||||
*/
|
*/
|
||||||
public class SinkRecord extends CopycatRecord {
|
public class SinkRecord extends CopycatRecord {
|
||||||
private final long offset;
|
private final long kafkaOffset;
|
||||||
|
|
||||||
public SinkRecord(String topic, int partition, Object key, Object value, long offset) {
|
public SinkRecord(String topic, int partition, Object key, Object value, long kafkaOffset) {
|
||||||
super(topic, partition, key, value);
|
super(topic, partition, key, value);
|
||||||
this.offset = offset;
|
this.kafkaOffset = kafkaOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getOffset() {
|
public long getKafkaOffset() {
|
||||||
return offset;
|
return kafkaOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -47,7 +47,7 @@ public class SinkRecord extends CopycatRecord {
|
||||||
|
|
||||||
SinkRecord that = (SinkRecord) o;
|
SinkRecord that = (SinkRecord) o;
|
||||||
|
|
||||||
if (offset != that.offset)
|
if (kafkaOffset != that.kafkaOffset)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -56,14 +56,14 @@ public class SinkRecord extends CopycatRecord {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int result = super.hashCode();
|
int result = super.hashCode();
|
||||||
result = 31 * result + (int) (offset ^ (offset >>> 32));
|
result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "SinkRecord{" +
|
return "SinkRecord{" +
|
||||||
"offset=" + offset +
|
"kafkaOffset=" + kafkaOffset +
|
||||||
"} " + super.toString();
|
"} " + super.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,45 +23,45 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
|
||||||
* <p>
|
* <p>
|
||||||
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
|
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
|
||||||
* Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
|
* Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
|
||||||
* in Kafka, they also include a stream and offset.
|
* in Kafka, they also include a sourcePartition and sourceOffset.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* The stream represents a single input stream that the record came from (e.g. a filename, table
|
* The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table
|
||||||
* name, or topic-partition). The offset represents a position in that stream which can be used
|
* name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used
|
||||||
* to resume consumption of data.
|
* to resume consumption of data.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* These values can have arbitrary structure and should be represented using
|
* These values can have arbitrary structure and should be represented using
|
||||||
* org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
|
* org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
|
||||||
* might specify the stream as a record containing { "db": "database_name", "table":
|
* might specify the sourcePartition as a record containing { "db": "database_name", "table":
|
||||||
* "table_name"} and the offset as a Long containing the timestamp of the row.
|
* "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class SourceRecord extends CopycatRecord {
|
public class SourceRecord extends CopycatRecord {
|
||||||
private final Object stream;
|
private final Object sourcePartition;
|
||||||
private final Object offset;
|
private final Object sourceOffset;
|
||||||
|
|
||||||
public SourceRecord(Object stream, Object offset, String topic, Integer partition, Object value) {
|
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, Object value) {
|
||||||
this(stream, offset, topic, partition, null, value);
|
this(sourcePartition, sourceOffset, topic, partition, null, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SourceRecord(Object stream, Object offset, String topic, Object value) {
|
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Object value) {
|
||||||
this(stream, offset, topic, null, null, value);
|
this(sourcePartition, sourceOffset, topic, null, null, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SourceRecord(Object stream, Object offset, String topic, Integer partition,
|
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition,
|
||||||
Object key, Object value) {
|
Object key, Object value) {
|
||||||
super(topic, partition, key, value);
|
super(topic, partition, key, value);
|
||||||
this.stream = stream;
|
this.sourcePartition = sourcePartition;
|
||||||
this.offset = offset;
|
this.sourceOffset = sourceOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getStream() {
|
public Object getSourcePartition() {
|
||||||
return stream;
|
return sourcePartition;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getOffset() {
|
public Object getSourceOffset() {
|
||||||
return offset;
|
return sourceOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -75,9 +75,9 @@ public class SourceRecord extends CopycatRecord {
|
||||||
|
|
||||||
SourceRecord that = (SourceRecord) o;
|
SourceRecord that = (SourceRecord) o;
|
||||||
|
|
||||||
if (offset != null ? !offset.equals(that.offset) : that.offset != null)
|
if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
|
||||||
return false;
|
return false;
|
||||||
if (stream != null ? !stream.equals(that.stream) : that.stream != null)
|
if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -86,16 +86,16 @@ public class SourceRecord extends CopycatRecord {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int result = super.hashCode();
|
int result = super.hashCode();
|
||||||
result = 31 * result + (stream != null ? stream.hashCode() : 0);
|
result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
|
||||||
result = 31 * result + (offset != null ? offset.hashCode() : 0);
|
result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "SourceRecord{" +
|
return "SourceRecord{" +
|
||||||
"stream=" + stream +
|
"sourcePartition=" + sourcePartition +
|
||||||
", offset=" + offset +
|
", sourceOffset=" + sourceOffset +
|
||||||
"} " + super.toString();
|
"} " + super.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,30 +27,30 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public interface OffsetStorageReader {
|
public interface OffsetStorageReader {
|
||||||
/**
|
/**
|
||||||
* Get the offset for the specified stream. If the data isn't already available locally, this
|
* Get the offset for the specified partition. If the data isn't already available locally, this
|
||||||
* gets it from the backing store, which may require some network round trips.
|
* gets it from the backing store, which may require some network round trips.
|
||||||
*
|
*
|
||||||
* @param stream object uniquely identifying the stream of data
|
* @param partition object uniquely identifying the partition of data
|
||||||
* @return object uniquely identifying the offset in the stream of data
|
* @return object uniquely identifying the offset in the partition of data
|
||||||
*/
|
*/
|
||||||
Object getOffset(Object stream);
|
Object getOffset(Object partition);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Get a set of offsets for the specified stream identifiers. This may be more efficient
|
* Get a set of offsets for the specified partition identifiers. This may be more efficient
|
||||||
* than calling {@link #getOffset(Object)} repeatedly.
|
* than calling {@link #getOffset(Object)} repeatedly.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* Note that when errors occur, this method omits the associated data and tries to return as
|
* Note that when errors occur, this method omits the associated data and tries to return as
|
||||||
* many of the requested values as possible. This allows a task that's managing many streams to
|
* many of the requested values as possible. This allows a task that's managing many partitions to
|
||||||
* still proceed with any available data. Therefore, implementations should take care to check
|
* still proceed with any available data. Therefore, implementations should take care to check
|
||||||
* that the data is actually available in the returned response. The only case when an
|
* that the data is actually available in the returned response. The only case when an
|
||||||
* exception will be thrown is if the entire request failed, e.g. because the underlying
|
* exception will be thrown is if the entire request failed, e.g. because the underlying
|
||||||
* storage was unavailable.
|
* storage was unavailable.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @param streams set of identifiers for streams of data
|
* @param partitions set of identifiers for partitions of data
|
||||||
* @return a map of stream identifiers to decoded offsets
|
* @return a map of partition identifiers to decoded offsets
|
||||||
*/
|
*/
|
||||||
Map<Object, Object> getOffsets(Collection<Object> streams);
|
Map<Object, Object> getOffsets(Collection<Object> partitions);
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class FileStreamSourceTaskTest {
|
||||||
assertEquals(1, records.size());
|
assertEquals(1, records.size());
|
||||||
assertEquals(TOPIC, records.get(0).getTopic());
|
assertEquals(TOPIC, records.get(0).getTopic());
|
||||||
assertEquals("partial line finished", records.get(0).getValue());
|
assertEquals("partial line finished", records.get(0).getValue());
|
||||||
assertEquals(22L, records.get(0).getOffset());
|
assertEquals(22L, records.get(0).getSourceOffset());
|
||||||
assertEquals(null, task.poll());
|
assertEquals(null, task.poll());
|
||||||
|
|
||||||
// Different line endings, and make sure the final \r doesn't result in a line until we can
|
// Different line endings, and make sure the final \r doesn't result in a line until we can
|
||||||
|
@ -99,20 +99,20 @@ public class FileStreamSourceTaskTest {
|
||||||
records = task.poll();
|
records = task.poll();
|
||||||
assertEquals(4, records.size());
|
assertEquals(4, records.size());
|
||||||
assertEquals("line1", records.get(0).getValue());
|
assertEquals("line1", records.get(0).getValue());
|
||||||
assertEquals(28L, records.get(0).getOffset());
|
assertEquals(28L, records.get(0).getSourceOffset());
|
||||||
assertEquals("line2", records.get(1).getValue());
|
assertEquals("line2", records.get(1).getValue());
|
||||||
assertEquals(35L, records.get(1).getOffset());
|
assertEquals(35L, records.get(1).getSourceOffset());
|
||||||
assertEquals("line3", records.get(2).getValue());
|
assertEquals("line3", records.get(2).getValue());
|
||||||
assertEquals(41L, records.get(2).getOffset());
|
assertEquals(41L, records.get(2).getSourceOffset());
|
||||||
assertEquals("line4", records.get(3).getValue());
|
assertEquals("line4", records.get(3).getValue());
|
||||||
assertEquals(47L, records.get(3).getOffset());
|
assertEquals(47L, records.get(3).getSourceOffset());
|
||||||
|
|
||||||
os.write("subsequent text".getBytes());
|
os.write("subsequent text".getBytes());
|
||||||
os.flush();
|
os.flush();
|
||||||
records = task.poll();
|
records = task.poll();
|
||||||
assertEquals(1, records.size());
|
assertEquals(1, records.size());
|
||||||
assertEquals("", records.get(0).getValue());
|
assertEquals("", records.get(0).getValue());
|
||||||
assertEquals(48L, records.get(0).getOffset());
|
assertEquals(48L, records.get(0).getSourceOffset());
|
||||||
|
|
||||||
task.stop();
|
task.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
|
|
||||||
public static final String OFFSET_STORAGE_CLASS_CONFIG = "offset.storage.class";
|
public static final String OFFSET_STORAGE_CLASS_CONFIG = "offset.storage.class";
|
||||||
private static final String OFFSET_STORAGE_CLASS_DOC =
|
private static final String OFFSET_STORAGE_CLASS_DOC =
|
||||||
"OffsetBackingStore implementation to use for storing stream offset data";
|
"OffsetBackingStore implementation to use for storing partition offset data";
|
||||||
public static final String OFFSET_STORAGE_CLASS_DEFAULT
|
public static final String OFFSET_STORAGE_CLASS_DEFAULT
|
||||||
= "org.apache.kafka.copycat.storage.MemoryOffsetBackingStore";
|
= "org.apache.kafka.copycat.storage.MemoryOffsetBackingStore";
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
|
|
||||||
public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
|
public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
|
||||||
private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
|
private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
|
||||||
= "Maximum number of milliseconds to wait for records to flush and stream offset data to be"
|
= "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
|
||||||
+ " committed to offset storage before cancelling the process and restoring the offset "
|
+ " committed to offset storage before cancelling the process and restoring the offset "
|
||||||
+ "data to be committed in a future attempt.";
|
+ "data to be committed in a future attempt.";
|
||||||
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
|
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
private synchronized void sendRecords(List<SourceRecord> records) {
|
private synchronized void sendRecords(List<SourceRecord> records) {
|
||||||
for (SourceRecord record : records) {
|
for (SourceRecord record : records) {
|
||||||
final ProducerRecord<Object, Object> producerRecord
|
final ProducerRecord<Object, Object> producerRecord
|
||||||
= new ProducerRecord<>(record.getTopic(), record.getPartition(),
|
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
|
||||||
converter.fromCopycatData(record.getKey()),
|
converter.fromCopycatData(record.getKey()),
|
||||||
converter.fromCopycatData(record.getValue()));
|
converter.fromCopycatData(record.getValue()));
|
||||||
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
|
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
|
||||||
|
@ -156,7 +156,7 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Offsets are converted & serialized in the OffsetWriter
|
// Offsets are converted & serialized in the OffsetWriter
|
||||||
offsetWriter.setOffset(record.getStream(), record.getOffset());
|
offsetWriter.setOffset(record.getSourcePartition(), record.getSourceOffset());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,21 +55,21 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getOffset(Object stream) {
|
public Object getOffset(Object partition) {
|
||||||
return getOffsets(Arrays.asList(stream)).get(stream);
|
return getOffsets(Arrays.asList(partition)).get(partition);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<Object, Object> getOffsets(Collection<Object> streams) {
|
public Map<Object, Object> getOffsets(Collection<Object> partitions) {
|
||||||
// Serialize keys so backing store can work with them
|
// Serialize keys so backing store can work with them
|
||||||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(streams.size());
|
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
|
||||||
for (Object key : streams) {
|
for (Object key : partitions) {
|
||||||
try {
|
try {
|
||||||
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
||||||
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
||||||
serializedToOriginal.put(keyBuffer, key);
|
serializedToOriginal.put(keyBuffer, key);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.error("CRITICAL: Failed to serialize stream key when getting offsets for task with "
|
log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with "
|
||||||
+ "namespace {}. No value for this data will be returned, which may break the "
|
+ "namespace {}. No value for this data will be returned, which may break the "
|
||||||
+ "task or cause it to skip some data.", namespace, t);
|
+ "task or cause it to skip some data.", namespace, t);
|
||||||
}
|
}
|
||||||
|
@ -85,12 +85,12 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deserialize all the values and map back to the original keys
|
// Deserialize all the values and map back to the original keys
|
||||||
Map<Object, Object> result = new HashMap<>(streams.size());
|
Map<Object, Object> result = new HashMap<>(partitions.size());
|
||||||
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
|
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
|
||||||
try {
|
try {
|
||||||
// Since null could be a valid key, explicitly check whether map contains the key
|
// Since null could be a valid key, explicitly check whether map contains the key
|
||||||
if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
|
if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
|
||||||
log.error("Should be able to map {} back to a requested stream-offset key, backing "
|
log.error("Should be able to map {} back to a requested partition-offset key, backing "
|
||||||
+ "store may have returned invalid data", rawEntry.getKey());
|
+ "store may have returned invalid data", rawEntry.getKey());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,13 +38,13 @@ import java.util.concurrent.Future;
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* Copycat uses an OffsetStorage implementation to save state about the current progress of
|
* Copycat uses an OffsetStorage implementation to save state about the current progress of
|
||||||
* source (import to Kafka) jobs, which may have many input streams and "offsets" may not be as
|
* source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as
|
||||||
* simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs
|
* simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs
|
||||||
* because they can use Kafka's native offset storage (or the sink data store can handle offset
|
* because they can use Kafka's native offset storage (or the sink data store can handle offset
|
||||||
* storage to achieve exactly once semantics).
|
* storage to achieve exactly once semantics).
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* Both streams and offsets are generic data objects. This allows different connectors to use
|
* Both partitions and offsets are generic data objects. This allows different connectors to use
|
||||||
* whatever representation they need, even arbitrarily complex records. These are translated
|
* whatever representation they need, even arbitrarily complex records. These are translated
|
||||||
* internally into the serialized form the OffsetBackingStore uses.
|
* internally into the serialized form the OffsetBackingStore uses.
|
||||||
* </p>
|
* </p>
|
||||||
|
@ -53,8 +53,8 @@ import java.util.concurrent.Future;
|
||||||
* never read. Offset data should only be read during startup or reconfiguration of a task. By
|
* never read. Offset data should only be read during startup or reconfiguration of a task. By
|
||||||
* always serving those requests by reading the values from the backing store, we ensure we never
|
* always serving those requests by reading the values from the backing store, we ensure we never
|
||||||
* accidentally use stale data. (One example of how this can occur: a task is processing input
|
* accidentally use stale data. (One example of how this can occur: a task is processing input
|
||||||
* stream A, writing offsets; reconfiguration causes stream A to be reassigned elsewhere;
|
* partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere;
|
||||||
* reconfiguration causes stream A to be reassigned to this node, but now the offset data is out
|
* reconfiguration causes partition A to be reassigned to this node, but now the offset data is out
|
||||||
* of date). Since these offsets are created and managed by the connector itself, there's no way
|
* of date). Since these offsets are created and managed by the connector itself, there's no way
|
||||||
* for the offset management layer to know which keys are "owned" by which tasks at any given
|
* for the offset management layer to know which keys are "owned" by which tasks at any given
|
||||||
* time.
|
* time.
|
||||||
|
@ -88,8 +88,8 @@ public class OffsetStorageWriter {
|
||||||
this.valueSerializer = valueSerializer;
|
this.valueSerializer = valueSerializer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setOffset(Object stream, Object offset) {
|
public synchronized void setOffset(Object partition, Object offset) {
|
||||||
data.put(stream, offset);
|
data.put(partition, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean flushing() {
|
private boolean flushing() {
|
||||||
|
@ -145,7 +145,7 @@ public class OffsetStorageWriter {
|
||||||
// unable to make progress.
|
// unable to make progress.
|
||||||
log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
|
log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
|
||||||
+ "offsets under namespace {}. This likely won't recover unless the "
|
+ "offsets under namespace {}. This likely won't recover unless the "
|
||||||
+ "unserializable stream or offset information is overwritten.", namespace);
|
+ "unserializable partition or offset information is overwritten.", namespace);
|
||||||
callback.onCompletion(t, null);
|
callback.onCompletion(t, null);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class WorkerSourceTaskTest extends ThreadedTest {
|
public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
private static final byte[] STREAM_BYTES = "stream".getBytes();
|
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
||||||
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
||||||
|
|
||||||
private static final Integer RECORD = 12;
|
private static final Integer RECORD = 12;
|
||||||
|
@ -74,7 +74,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
|
|
||||||
private static final Properties EMPTY_TASK_PROPS = new Properties();
|
private static final Properties EMPTY_TASK_PROPS = new Properties();
|
||||||
private static final List<SourceRecord> RECORDS = Arrays.asList(
|
private static final List<SourceRecord> RECORDS = Arrays.asList(
|
||||||
new SourceRecord(STREAM_BYTES, OFFSET_BYTES,
|
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES,
|
||||||
"topic", RECORD)
|
"topic", RECORD)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -193,7 +193,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
|
|
||||||
List<SourceRecord> records = new ArrayList<>();
|
List<SourceRecord> records = new ArrayList<>();
|
||||||
// Can just use the same record for key and value
|
// Can just use the same record for key and value
|
||||||
records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
|
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
|
||||||
|
|
||||||
Capture<ProducerRecord> sent = expectSendRecord();
|
Capture<ProducerRecord> sent = expectSendRecord();
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// 2. Offset data is passed to the offset storage.
|
// 2. Offset data is passed to the offset storage.
|
||||||
offsetWriter.setOffset(STREAM_BYTES, OFFSET_BYTES);
|
offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES);
|
||||||
PowerMock.expectLastCall().anyTimes();
|
PowerMock.expectLastCall().anyTimes();
|
||||||
|
|
||||||
return sent;
|
return sent;
|
||||||
|
|
Loading…
Reference in New Issue