Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-31 23:00:29 -07:00
parent e3451427f2
commit 0b5a1a0c57
11 changed files with 84 additions and 84 deletions

View File

@ -23,7 +23,7 @@ package org.apache.kafka.copycat.connector;
public interface ConnectorContext {
/**
* Requests that the runtime reconfigure the Tasks for this source. This should be used to
* indicate to the runtime that something about the input/output has changed (e.g. streams
* indicate to the runtime that something about the input/output has changed (e.g. partitions
* added/removed) and the running Tasks will need to be modified.
*/
void requestTaskReconfiguration();

View File

@ -21,23 +21,23 @@ package org.apache.kafka.copycat.connector;
* <p>
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
* sources and sinks (topic, partition, key, value). Although both implementations include a
* sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a
* notion of offset, it is not included here because they differ in type.
* </p>
*/
public abstract class CopycatRecord {
private final String topic;
private final Integer partition;
private final Integer kafkaPartition;
private final Object key;
private final Object value;
public CopycatRecord(String topic, Integer partition, Object value) {
this(topic, partition, null, value);
public CopycatRecord(String topic, Integer kafkaPartition, Object value) {
this(topic, kafkaPartition, null, value);
}
public CopycatRecord(String topic, Integer partition, Object key, Object value) {
public CopycatRecord(String topic, Integer kafkaPartition, Object key, Object value) {
this.topic = topic;
this.partition = partition;
this.kafkaPartition = kafkaPartition;
this.key = key;
this.value = value;
}
@ -46,8 +46,8 @@ public abstract class CopycatRecord {
return topic;
}
public Integer getPartition() {
return partition;
public Integer getKafkaPartition() {
return kafkaPartition;
}
public Object getKey() {
@ -62,7 +62,7 @@ public abstract class CopycatRecord {
public String toString() {
return "CopycatRecord{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", kafkaPartition=" + kafkaPartition +
", key=" + key +
", value=" + value +
'}';
@ -79,7 +79,7 @@ public abstract class CopycatRecord {
if (key != null ? !key.equals(that.key) : that.key != null)
return false;
if (partition != null ? !partition.equals(that.partition) : that.partition != null)
if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
return false;
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
@ -92,7 +92,7 @@ public abstract class CopycatRecord {
@Override
public int hashCode() {
int result = topic != null ? topic.hashCode() : 0;
result = 31 * result + (partition != null ? partition.hashCode() : 0);
result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;

View File

@ -20,20 +20,20 @@ package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.CopycatRecord;
/**
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the offset of
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of
* the record in the Kafka topic-partition in addition to the standard fields. This information
* should be used by the SinkTask to coordinate offset commits.
* should be used by the SinkTask to coordinate kafkaOffset commits.
*/
public class SinkRecord extends CopycatRecord {
private final long offset;
private final long kafkaOffset;
public SinkRecord(String topic, int partition, Object key, Object value, long offset) {
public SinkRecord(String topic, int partition, Object key, Object value, long kafkaOffset) {
super(topic, partition, key, value);
this.offset = offset;
this.kafkaOffset = kafkaOffset;
}
public long getOffset() {
return offset;
public long getKafkaOffset() {
return kafkaOffset;
}
@Override
@ -47,7 +47,7 @@ public class SinkRecord extends CopycatRecord {
SinkRecord that = (SinkRecord) o;
if (offset != that.offset)
if (kafkaOffset != that.kafkaOffset)
return false;
return true;
@ -56,14 +56,14 @@ public class SinkRecord extends CopycatRecord {
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (offset ^ (offset >>> 32));
result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
return result;
}
@Override
public String toString() {
return "SinkRecord{" +
"offset=" + offset +
"kafkaOffset=" + kafkaOffset +
"} " + super.toString();
}
}

View File

@ -23,45 +23,45 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
* <p>
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
* Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
* in Kafka, they also include a stream and offset.
* in Kafka, they also include a sourcePartition and sourceOffset.
* </p>
* <p>
* The stream represents a single input stream that the record came from (e.g. a filename, table
* name, or topic-partition). The offset represents a position in that stream which can be used
* The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table
* name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used
* to resume consumption of data.
* </p>
* <p>
* These values can have arbitrary structure and should be represented using
* org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
* might specify the stream as a record containing { "db": "database_name", "table":
* "table_name"} and the offset as a Long containing the timestamp of the row.
* might specify the sourcePartition as a record containing { "db": "database_name", "table":
* "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
* </p>
*/
public class SourceRecord extends CopycatRecord {
private final Object stream;
private final Object offset;
private final Object sourcePartition;
private final Object sourceOffset;
public SourceRecord(Object stream, Object offset, String topic, Integer partition, Object value) {
this(stream, offset, topic, partition, null, value);
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, Object value) {
this(sourcePartition, sourceOffset, topic, partition, null, value);
}
public SourceRecord(Object stream, Object offset, String topic, Object value) {
this(stream, offset, topic, null, null, value);
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Object value) {
this(sourcePartition, sourceOffset, topic, null, null, value);
}
public SourceRecord(Object stream, Object offset, String topic, Integer partition,
public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition,
Object key, Object value) {
super(topic, partition, key, value);
this.stream = stream;
this.offset = offset;
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
}
public Object getStream() {
return stream;
public Object getSourcePartition() {
return sourcePartition;
}
public Object getOffset() {
return offset;
public Object getSourceOffset() {
return sourceOffset;
}
@Override
@ -75,9 +75,9 @@ public class SourceRecord extends CopycatRecord {
SourceRecord that = (SourceRecord) o;
if (offset != null ? !offset.equals(that.offset) : that.offset != null)
if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
return false;
if (stream != null ? !stream.equals(that.stream) : that.stream != null)
if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
return false;
return true;
@ -86,16 +86,16 @@ public class SourceRecord extends CopycatRecord {
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (stream != null ? stream.hashCode() : 0);
result = 31 * result + (offset != null ? offset.hashCode() : 0);
result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "SourceRecord{" +
"stream=" + stream +
", offset=" + offset +
"sourcePartition=" + sourcePartition +
", sourceOffset=" + sourceOffset +
"} " + super.toString();
}
}

View File

@ -27,30 +27,30 @@ import java.util.Map;
*/
public interface OffsetStorageReader {
/**
* Get the offset for the specified stream. If the data isn't already available locally, this
* Get the offset for the specified partition. If the data isn't already available locally, this
* gets it from the backing store, which may require some network round trips.
*
* @param stream object uniquely identifying the stream of data
* @return object uniquely identifying the offset in the stream of data
* @param partition object uniquely identifying the partition of data
* @return object uniquely identifying the offset in the partition of data
*/
Object getOffset(Object stream);
Object getOffset(Object partition);
/**
* <p>
* Get a set of offsets for the specified stream identifiers. This may be more efficient
* Get a set of offsets for the specified partition identifiers. This may be more efficient
* than calling {@link #getOffset(Object)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
* many of the requested values as possible. This allows a task that's managing many streams to
* many of the requested values as possible. This allows a task that's managing many partitions to
* still proceed with any available data. Therefore, implementations should take care to check
* that the data is actually available in the returned response. The only case when an
* exception will be thrown is if the entire request failed, e.g. because the underlying
* storage was unavailable.
* </p>
*
* @param streams set of identifiers for streams of data
* @return a map of stream identifiers to decoded offsets
* @param partitions set of identifiers for partitions of data
* @return a map of partition identifiers to decoded offsets
*/
Map<Object, Object> getOffsets(Collection<Object> streams);
Map<Object, Object> getOffsets(Collection<Object> partitions);
}

View File

@ -89,7 +89,7 @@ public class FileStreamSourceTaskTest {
assertEquals(1, records.size());
assertEquals(TOPIC, records.get(0).getTopic());
assertEquals("partial line finished", records.get(0).getValue());
assertEquals(22L, records.get(0).getOffset());
assertEquals(22L, records.get(0).getSourceOffset());
assertEquals(null, task.poll());
// Different line endings, and make sure the final \r doesn't result in a line until we can
@ -99,20 +99,20 @@ public class FileStreamSourceTaskTest {
records = task.poll();
assertEquals(4, records.size());
assertEquals("line1", records.get(0).getValue());
assertEquals(28L, records.get(0).getOffset());
assertEquals(28L, records.get(0).getSourceOffset());
assertEquals("line2", records.get(1).getValue());
assertEquals(35L, records.get(1).getOffset());
assertEquals(35L, records.get(1).getSourceOffset());
assertEquals("line3", records.get(2).getValue());
assertEquals(41L, records.get(2).getOffset());
assertEquals(41L, records.get(2).getSourceOffset());
assertEquals("line4", records.get(3).getValue());
assertEquals(47L, records.get(3).getOffset());
assertEquals(47L, records.get(3).getSourceOffset());
os.write("subsequent text".getBytes());
os.flush();
records = task.poll();
assertEquals(1, records.size());
assertEquals("", records.get(0).getValue());
assertEquals(48L, records.get(0).getOffset());
assertEquals(48L, records.get(0).getSourceOffset());
task.stop();
}

View File

@ -78,7 +78,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String OFFSET_STORAGE_CLASS_CONFIG = "offset.storage.class";
private static final String OFFSET_STORAGE_CLASS_DOC =
"OffsetBackingStore implementation to use for storing stream offset data";
"OffsetBackingStore implementation to use for storing partition offset data";
public static final String OFFSET_STORAGE_CLASS_DEFAULT
= "org.apache.kafka.copycat.storage.MemoryOffsetBackingStore";
@ -89,7 +89,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
= "Maximum number of milliseconds to wait for records to flush and stream offset data to be"
= "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+ " committed to offset storage before cancelling the process and restoring the offset "
+ "data to be committed in a future attempt.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;

View File

@ -131,7 +131,7 @@ public class WorkerSourceTask implements WorkerTask {
private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord
= new ProducerRecord<>(record.getTopic(), record.getPartition(),
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
converter.fromCopycatData(record.getKey()),
converter.fromCopycatData(record.getValue()));
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
@ -156,7 +156,7 @@ public class WorkerSourceTask implements WorkerTask {
}
});
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.setOffset(record.getStream(), record.getOffset());
offsetWriter.setOffset(record.getSourcePartition(), record.getSourceOffset());
}
}

View File

@ -55,21 +55,21 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
@Override
public Object getOffset(Object stream) {
return getOffsets(Arrays.asList(stream)).get(stream);
public Object getOffset(Object partition) {
return getOffsets(Arrays.asList(partition)).get(partition);
}
@Override
public Map<Object, Object> getOffsets(Collection<Object> streams) {
public Map<Object, Object> getOffsets(Collection<Object> partitions) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(streams.size());
for (Object key : streams) {
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
for (Object key : partitions) {
try {
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
log.error("CRITICAL: Failed to serialize stream key when getting offsets for task with "
log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with "
+ "namespace {}. No value for this data will be returned, which may break the "
+ "task or cause it to skip some data.", namespace, t);
}
@ -85,12 +85,12 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
// Deserialize all the values and map back to the original keys
Map<Object, Object> result = new HashMap<>(streams.size());
Map<Object, Object> result = new HashMap<>(partitions.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try {
// Since null could be a valid key, explicitly check whether map contains the key
if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
log.error("Should be able to map {} back to a requested stream-offset key, backing "
log.error("Should be able to map {} back to a requested partition-offset key, backing "
+ "store may have returned invalid data", rawEntry.getKey());
continue;
}

View File

@ -38,13 +38,13 @@ import java.util.concurrent.Future;
* </p>
* <p>
* Copycat uses an OffsetStorage implementation to save state about the current progress of
* source (import to Kafka) jobs, which may have many input streams and "offsets" may not be as
* source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as
* simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs
* because they can use Kafka's native offset storage (or the sink data store can handle offset
* storage to achieve exactly once semantics).
* </p>
* <p>
* Both streams and offsets are generic data objects. This allows different connectors to use
* Both partitions and offsets are generic data objects. This allows different connectors to use
* whatever representation they need, even arbitrarily complex records. These are translated
* internally into the serialized form the OffsetBackingStore uses.
* </p>
@ -53,8 +53,8 @@ import java.util.concurrent.Future;
* never read. Offset data should only be read during startup or reconfiguration of a task. By
* always serving those requests by reading the values from the backing store, we ensure we never
* accidentally use stale data. (One example of how this can occur: a task is processing input
* stream A, writing offsets; reconfiguration causes stream A to be reassigned elsewhere;
* reconfiguration causes stream A to be reassigned to this node, but now the offset data is out
* partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere;
* reconfiguration causes partition A to be reassigned to this node, but now the offset data is out
* of date). Since these offsets are created and managed by the connector itself, there's no way
* for the offset management layer to know which keys are "owned" by which tasks at any given
* time.
@ -88,8 +88,8 @@ public class OffsetStorageWriter {
this.valueSerializer = valueSerializer;
}
public synchronized void setOffset(Object stream, Object offset) {
data.put(stream, offset);
public synchronized void setOffset(Object partition, Object offset) {
data.put(partition, offset);
}
private boolean flushing() {
@ -145,7 +145,7 @@ public class OffsetStorageWriter {
// unable to make progress.
log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+ "offsets under namespace {}. This likely won't recover unless the "
+ "unserializable stream or offset information is overwritten.", namespace);
+ "unserializable partition or offset information is overwritten.", namespace);
callback.onCompletion(t, null);
return null;
}

View File

@ -52,7 +52,7 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] STREAM_BYTES = "stream".getBytes();
private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
private static final Integer RECORD = 12;
@ -74,7 +74,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final Properties EMPTY_TASK_PROPS = new Properties();
private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(STREAM_BYTES, OFFSET_BYTES,
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES,
"topic", RECORD)
);
@ -193,7 +193,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
Capture<ProducerRecord> sent = expectSendRecord();
@ -247,7 +247,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
});
// 2. Offset data is passed to the offset storage.
offsetWriter.setOffset(STREAM_BYTES, OFFSET_BYTES);
offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES);
PowerMock.expectLastCall().anyTimes();
return sent;