Add copycat-data and copycat-api

This commit is contained in:
Ewen Cheslack-Postava 2015-07-24 13:20:15 -07:00
parent fd3a3cd595
commit 11981d2eaa
47 changed files with 7857 additions and 1 deletions

View File

@ -445,3 +445,89 @@ project(':log4j-appender') {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
project(':copycat-data') {
apply plugin: 'checkstyle'
archivesBaseName = "copycat-data"
dependencies {
compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6"
testCompile 'junit:junit:4.6'
testRuntime "$slf4jlog4j"
}
task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}
test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}
javadoc {
include "**/org/apache/kafka/copycat/data/*"
}
artifacts {
archives testJar
}
configurations {
archives.extendsFrom (testCompile)
}
/* FIXME
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest') */
}
project(':copycat-api') {
apply plugin: 'checkstyle'
archivesBaseName = "copycat-api"
dependencies {
compile project(':copycat-data')
compile "org.slf4j:slf4j-api:1.7.6"
testCompile 'junit:junit:4.6'
testRuntime "$slf4jlog4j"
}
task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}
test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}
javadoc {
include "**/org/apache/kafka/copycat/api/*"
}
artifacts {
archives testJar
}
configurations {
archives.extendsFrom (testCompile)
}
/* FIXME
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest') */
}

View File

@ -106,4 +106,19 @@
<allow pkg="org.apache.kafka" />
</subpackage>
<subpackage name="copycat">
<allow pkg="org.apache.kafka.copycat.data" />
<allow pkg="org.apache.kafka.copycat.errors" />
<subpackage name="source">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="sink">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
</subpackage>
</import-control>

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.List;
import java.util.Properties;
/**
* <p>
* Connectors manage integration of Copycat with another system, either as an input that ingests
* data into Kafka or an output that passes data to an external system. Implementations should
* not use this class directly; they should inherit from SourceConnector or SinkConnector.
* </p>
* <p>
* Connectors have two primary tasks. First, given some configuration, they are responsible for
* creating configurations for a set of {@link Task}s that split up the data processing. For
* example, a database Connector might create Tasks by dividing the set of tables evenly among
* tasks. Second, they are responsible for monitoring inputs for changes that require
* reconfiguration and notifying the Copycat runtime via the ConnectorContext. Continuing the
* previous example, the connector might periodically check for new tables and notify Copycat of
* additions and deletions. Copycat will then request new configurations and update the running
* Tasks.
* </p>
*/
public abstract class Connector {
protected ConnectorContext context;
/**
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
* input configuration changes.
* @param ctx context object used to interact with the Copycat runtime
*/
public void initialize(ConnectorContext ctx) {
context = ctx;
}
/**
* <p>
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
* input configuration changes and using the provided set of Task configurations.
* This version is only used to recover from failures.
* </p>
* <p>
* The default implementation ignores the provided simply ignores the provided Task
* configurations. During recovery, Copycat will request an updated set of configurations and
* update the running Tasks appropriately. However, Connectors should implement special
* handling of this case if it will avoid unnecessary changes to running Tasks.
* </p>
*
* @param ctx context object used to interact with the Copycat runtime
* @param taskConfigs
*/
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
context = ctx;
// Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
// are very different, but reduces the difficulty of implementing a Connector
}
/**
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
*
* @param props configuration settings
* @throws CopycatException
*/
public abstract void start(Properties props) throws CopycatException;
/**
* Reconfigure this Connector. Most implementations will not override this, using the default
* implementation that calls {@link #stop()} followed by {@link #start(Properties)}.
* Implementations only need to override this if they want to handle this process more
* efficiently, e.g. without shutting down network connections to the external system.
*
* @param props new configuration settings
* @throws CopycatException
*/
public void reconfigure(Properties props) throws CopycatException {
stop();
start(props);
}
/**
* Returns the Task implementation for this Connector.
*/
public abstract Class<? extends Task> getTaskClass();
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
*
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Properties> getTaskConfigs(int maxTasks);
/**
* Stop this connector.
*
* @throws CopycatException
*/
public abstract void stop() throws CopycatException;
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
import java.util.List;
/**
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
*/
public interface ConnectorContext {
/**
* Requests that the runtime reconfigure the Tasks for this source. This should be used to
* indicate to the runtime that something about the input/output has changed (e.g. streams
* added/removed) and the running Tasks will need to be modified.
*/
public void requestTaskReconfiguration();
/**
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
* to divide TopicPartitions between child tasks.
* @param topics list of topics to get partitions for
* @return list of all TopicPartitions for the input topics
*/
public abstract List<TopicPartition> getTopicPartitions(String... topics);
/**
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
* to divide TopicPartitions between child tasks.
* @param topics list of topics to get partitions for
* @return list of all TopicPartitions for the input topics
*/
public abstract List<TopicPartition> getTopicPartitions(List<String> topics);
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
/**
* <p>
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
* sources and sinks (topic, partition, key, value). Although both implementations include a
* notion of offest, it is not included here because they differ in type.
* </p>
* <p>
* This class uses type parameters for keys and values. These are provided primarily for
* connector developer convenience. Internally, Copycat will handle any primitive types or
* org.apache.kafka.copycat.data types.
* </p>
*/
public class CopycatRecord {
private final String topic;
private final Integer partition;
private final Object key;
private final Object value;
public CopycatRecord(String topic, Integer partition, Object value) {
this(topic, partition, null, value);
}
public CopycatRecord(String topic, Integer partition, Object key, Object value) {
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
}
public String getTopic() {
return topic;
}
public Integer getPartition() {
return partition;
}
public Object getKey() {
return key;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "CopycatRecord{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", key=" + key +
", value=" + value +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CopycatRecord that = (CopycatRecord) o;
if (key != null ? !key.equals(that.key) : that.key != null) {
return false;
}
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
return false;
}
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
return false;
}
if (value != null ? !value.equals(that.value) : that.value != null) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = topic != null ? topic.hashCode() : 0;
result = 31 * result + (partition != null ? partition.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Properties;
/**
* <p>
* Tasks contain the code that actually copies data to/from another system. They receive
* a configuration from their parent Connector, assigning them a fraction of a Copycat job's work.
* The Copycat framework then pushes/pulls data from the Task. The Task must also be able to
* respond to reconfiguration requests.
* </p>
* <p>
* Task only contains the minimal shared functionality between
* {@link org.apache.kafka.copycat.source.SourceTask} and
* {@link org.apache.kafka.copycat.sink.SinkTask}.
* </p>
*/
public interface Task {
/**
* Start the Task
* @param props initial configuration
*/
void start(Properties props);
/**
* Stop this task.
*
* @throws CopycatException
*/
void stop() throws CopycatException;
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
/**
* A topic name and partition number
*/
public class TopicPartition {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
/**
* Parse the TopicPartition from the string representation.
* @param topicPartition string representation of a TopicPartition
*/
public TopicPartition(String topicPartition) {
int lastDash = topicPartition.lastIndexOf('-');
if (lastDash < 0) {
throw new IllegalArgumentException("Invalid TopicPartition format");
}
this.topic = topicPartition.substring(0, lastDash);
String partitionStr = topicPartition.substring(lastDash + 1);
this.partition = Integer.parseInt(partitionStr);
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0) {
return hash;
}
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition) {
return false;
}
if (topic == null) {
if (other.topic != null) {
return false;
}
} else if (!topic.equals(other.topic)) {
return false;
}
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.errors;
/**
* CopycatException is the top-level exception type generated by Copycat and connectors.
*/
public class CopycatException extends Exception {
public CopycatException(String s) {
super(s);
}
public CopycatException(String s, Throwable throwable) {
super(s, throwable);
}
public CopycatException(Throwable throwable) {
super(throwable);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.errors;
public class CopycatRuntimeException extends RuntimeException {
public CopycatRuntimeException() {
}
public CopycatRuntimeException(String s) {
super(s);
}
public CopycatRuntimeException(String s, Throwable throwable) {
super(s, throwable);
}
public CopycatRuntimeException(Throwable throwable) {
super(throwable);
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.Connector;
/**
* SinkConnectors implement the Connector interface to send Kafka data to another system.
*/
public abstract class SinkConnector extends Connector {
/**
* <p>
* Configuration key for the list of input topics for this connector.
* </p>
* <p>
* Usually this setting is only relevant to the Copycat framework, but is provided here for
* the convenience of Connector developers if they also need to know the set of topics.
* </p>
*/
public static final String TOPICS_CONFIG = "topics";
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.CopycatRecord;
/**
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the offset of
* the record in the Kafka topic-partition in addition to the standard fields. This information
* should be used by the SinkTask to coordinate offset commits.
*/
public class SinkRecord extends CopycatRecord {
private final long offset;
public SinkRecord(String topic, int partition, Object key, Object value, long offset) {
super(topic, partition, key, value);
this.offset = offset;
}
public long getOffset() {
return offset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
SinkRecord that = (SinkRecord) o;
if (offset != that.offset) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (offset ^ (offset >>> 32));
return result;
}
@Override
public String toString() {
return "SinkRecord{" +
"offset=" + offset +
"} " + super.toString();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Collection;
import java.util.Map;
/**
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
* addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush}
* to support offset commits.
*/
public abstract class SinkTask implements Task {
/**
* <p>
* The configuration key that provides the list of topic partitions that are inputs for this
* SinkTask.
* </p>
* <p>
* Usually this setting is only used by the Copycat framework since it manages the Kafka
* consumer that provides input records. However, it is provided here for the convenience of
* SinkTask implementations that may also want to know the input set of topic partitions.
* </p>
*/
public static final String TOPICPARTITIONS_CONFIG = "topic.partitions";
protected SinkTaskContext context;
public void initialize(SinkTaskContext context) {
this.context = context;
}
/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
*
* @param records the set of records to send
*/
public abstract void put(Collection<SinkRecord> records) throws CopycatException;
/**
* Flush all records that have been {@link #put} for the specified topic-partitions. The
* offsets are provided for convenience, but could also be determined by tracking all offsets
* included in the SinkRecords passed to {@link #put}.
*
* @param offsets mapping of TopicPartition to committed offset
*/
public abstract void flush(Map<TopicPartition, Long> offsets);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.TopicPartition;
import java.util.HashMap;
import java.util.Map;
/**
* Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
*/
public abstract class SinkTaskContext {
private Map<TopicPartition, Long> offsets;
public SinkTaskContext() {
offsets = new HashMap<TopicPartition, Long>();
}
public void resetOffset(Map<TopicPartition, Long> offsets) {
this.offsets = offsets;
}
public Map<TopicPartition, Long> getOffsets() {
return offsets;
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.source;
import org.apache.kafka.copycat.connector.Connector;
/**
* SourceConnectors implement the connector interface to pull data from another system and send
* it to Kafka.
*/
public abstract class SourceConnector extends Connector {
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.source;
import org.apache.kafka.copycat.connector.CopycatRecord;
/**
* <p>
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
* Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
* in Kafka, they also include a stream and offset.
* </p>
* <p>
* The stream represents a single input stream that the record came from (e.g. a filename, table
* name, or topic-partition). The offset represents a position in that stream which can be used
* to resume consumption of data.
* </p>
* <p>
* These values can have arbitrary structure and should be represented using
* org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
* might specify the stream as a record containing { "db": "database_name", "table":
* "table_name"} and the offset as a Long containing the timestamp of the row.
* </p>
*/
public class SourceRecord extends CopycatRecord {
private final Object stream;
private final Object offset;
public SourceRecord(Object stream, Object offset, String topic, Integer partition, Object value) {
this(stream, offset, topic, partition, null, value);
}
public SourceRecord(Object stream, Object offset, String topic, Object value) {
this(stream, offset, topic, null, null, value);
}
public SourceRecord(Object stream, Object offset, String topic, Integer partition,
Object key, Object value) {
super(topic, partition, key, value);
this.stream = stream;
this.offset = offset;
}
public Object getStream() {
return stream;
}
public Object getOffset() {
return offset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
SourceRecord that = (SourceRecord) o;
if (offset != null ? !offset.equals(that.offset) : that.offset != null) {
return false;
}
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (stream != null ? stream.hashCode() : 0);
result = 31 * result + (offset != null ? offset.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "SourceRecord{" +
"stream=" + stream +
", offset=" + offset +
"} " + super.toString();
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.source;
import org.apache.kafka.copycat.connector.Task;
import java.util.List;
/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
*/
public abstract class SourceTask<K, V> implements Task {
protected SourceTaskContext context;
/**
* Initialize this SourceTask with the specified context object.
*/
public void initialize(SourceTaskContext context) {
this.context = context;
}
/**
* Poll this SourceTask for new records. This method should block if no data is currently
* available.
*
* @return a list of source records
*/
public abstract List<SourceRecord> poll() throws InterruptedException;
/**
* <p>
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
* method should block until the commit is complete.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Copycat will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*/
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.source;
import org.apache.kafka.copycat.storage.OffsetStorageReader;
/**
* SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
* runtime.
*/
public class SourceTaskContext {
private final OffsetStorageReader reader;
public SourceTaskContext(OffsetStorageReader reader) {
this.reader = reader;
}
/**
* Get the OffsetStorageReader for this SourceTask.
*/
public OffsetStorageReader getOffsetStorageReader() {
return reader;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
/**
* The Converter interface provides support for translating between Copycat's runtime data format
* and the "native" runtime format used by the serialization layer. This is used to translate
* two types of data: records and offsets. The (de)serialization is performed by a separate
* component -- the producer or consumer serializer or deserializer for records or a Copycat
* serializer or deserializer for offsets.
*/
public interface Converter {
/**
* Convert a Copycat data object to a native object for serialization.
* @param value
* @return
*/
Object fromCopycatData(Object value);
/**
* Convert a native object to a Copycat data object.
* @param value
* @return
*/
Object toCopycatData(Object value);
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.io.Closeable;
import java.util.Map;
/**
* Deserializer for Copycat offsets.
* @param <T>
*/
public interface OffsetDeserializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* Deserialize an offset key or value from the specified connector.
* @param connector connector associated with the data
* @param data serialized bytes
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data);
/**
* Deserialize an offset key or value from the specified connector using a schema.
* @param connector connector associated with the data
* @param data serialized bytes
* @param schema schema to deserialize to
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data, Schema schema);
@Override
public void close();
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import java.io.Closeable;
import java.util.Map;
/**
* Serializer for Copycat offsets.
* @param <T> native type of offsets.
*/
public interface OffsetSerializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* @param connector the connector associated with offsets
* @param data typed data
* @return serialized bytes
*/
public byte[] serializeOffset(String connector, T data);
/**
* Close this serializer.
*/
@Override
public void close();
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.util.Collection;
import java.util.Map;
/**
* OffsetStorageReader provides access to the offset storage used by sources. This can be used by
* connectors to determine offsets to start consuming data from. This is most commonly used during
* initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
*/
public interface OffsetStorageReader {
/**
* Get the offset for the specified stream. If the data isn't already available locally, this
* gets it from the backing store, which may require some network round trips.
*
* @param stream object uniquely identifying the stream of data
* @param schema schema used to decode the offset value
* @return object uniquely identifying the offset in the stream of data
*/
public Object getOffset(Object stream, Schema schema);
/**
* <p>
* Get a set of offsets for the specified stream identifiers. This may be more efficient
* than calling {@link #getOffset(Object, Schema)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
* many of the requested values as possible. This allows a task that's managing many streams to
* still proceed with any available data. Therefore, implementations should take care to check
* that the data is actually available in the returned response. The only case when an
* exception will be thrown is if the entire request failed, e.g. because the underlying
* storage was unavailable.
* </p>
*
* @param streams set of identifiers for streams of data
* @param schema schema used to decode offset values
* @return a map of stream identifiers to decoded offsets
*/
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema);
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
import java.util.ArrayList;
import java.util.List;
/**
* Utilities that connector implementations might find useful. Contains common building blocks
* for writing connectors.
*/
public class ConnectorUtils {
/**
* Given a set of elements and a target number of groups, generates an set of groups of
* elements to match the target number of groups, spreading them evenly among the groups.
* This generates groups with contiguous elements, which results in intuitive ordering if
* your elements are also ordered (e.g. alphabetical sets of table names if you sort
* table names alphabetically to generate the raw partitions) or can result in efficient
* partitioning if elements are sorted according to some criteria that affects performance
* (e.g. topic partitions with the same leader).
*
* @param elements list of elements to partition
* @param numGroups the number of output groups to generate.
*/
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
if (numGroups <= 0) {
throw new IllegalArgumentException("Number of groups must be positive.");
}
List<List<T>> result = new ArrayList<List<T>>(numGroups);
// Each group has either n+1 or n raw partitions
int perGroup = elements.size() / numGroups;
int leftover = elements.size() - (numGroups * perGroup);
int assigned = 0;
for (int group = 0; group < numGroups; group++) {
int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
List<T> groupList = new ArrayList<T>(numThisGroup);
for (int i = 0; i < numThisGroup; i++) {
groupList.add(elements.get(assigned));
assigned++;
}
result.add(groupList);
}
return result;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
/**
* General string utilities that are missing from the standard library and may commonly be
* required by Connector or Task implementations.
*/
public class StringUtils {
/**
* Generate a String by appending all the @{elements}, converted to Strings, delimited by
* @{delim}.
* @param elements list of elements to concatenate
* @param delim delimiter to place between each element
* @return the concatenated string with delimiters
*/
public static <T> String join(Iterable<T> elements, String delim) {
StringBuilder result = new StringBuilder();
boolean first = true;
for (T elem : elements) {
if (first) {
first = false;
} else {
result.append(delim);
}
result.append(elem);
}
return result.toString();
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
import org.apache.kafka.copycat.errors.CopycatException;
import org.junit.Test;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class ConnectorTest {
@Test
public void testDefaultReconfigure() throws Exception {
TestConnector conn = new TestConnector(false);
conn.reconfigure(new Properties());
assertEquals(conn.stopOrder, 0);
assertEquals(conn.configureOrder, 1);
}
@Test(expected = CopycatException.class)
public void testReconfigureStopException() throws Exception {
TestConnector conn = new TestConnector(true);
conn.reconfigure(new Properties());
}
private static class TestConnector extends Connector {
private boolean stopException;
private int order = 0;
public int stopOrder = -1;
public int configureOrder = -1;
public TestConnector(boolean stopException) {
this.stopException = stopException;
}
@Override
public void start(Properties props) throws CopycatException {
configureOrder = order++;
}
@Override
public Class<? extends Task> getTaskClass() {
return null;
}
@Override
public List<Properties> getTaskConfigs(int count) {
return null;
}
@Override
public void stop() throws CopycatException {
stopOrder = order++;
if (stopException) {
throw new CopycatException("error");
}
}
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class ConnectorUtilsTest {
private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5);
@Test
public void testGroupPartitions() {
List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
assertEquals(Arrays.asList(Arrays.asList(1, 2),
Arrays.asList(3, 4),
Arrays.asList(5)), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
assertEquals(Arrays.asList(Arrays.asList(1),
Arrays.asList(2),
Arrays.asList(3),
Arrays.asList(4),
Arrays.asList(5)), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
assertEquals(Arrays.asList(Arrays.asList(1),
Arrays.asList(2),
Arrays.asList(3),
Arrays.asList(4),
Arrays.asList(5),
Collections.EMPTY_LIST,
Collections.EMPTY_LIST), grouped);
}
@Test(expected = IllegalArgumentException.class)
public void testGroupPartitionsInvalidCount() {
ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
public class StringUtilsTest {
@Test
public void testJoin() {
assertEquals("", StringUtils.join(Collections.EMPTY_LIST, ""));
assertEquals("", StringUtils.join(Collections.EMPTY_LIST, ","));
assertEquals("ab", StringUtils.join(Arrays.asList("a", "b"), ""));
assertEquals("a,b,c", StringUtils.join(Arrays.asList("a", "b", "c"), ","));
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
public class BinaryData {
public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int end1 = s1 + l1;
int end2 = s2 + l2;
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
int a = b1[i] & 0xff;
int b = b2[j] & 0xff;
if (a != b) {
return a - b;
}
}
return l1 - l2;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Base Avro exception. */
public class DataRuntimeException extends RuntimeException {
public DataRuntimeException(Throwable cause) {
super(cause);
}
public DataRuntimeException(String message) {
super(message);
}
public DataRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Thrown when an illegal type is used. */
public class DataTypeException extends DataRuntimeException {
public DataTypeException(String message) {
super(message);
}
public DataTypeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import java.util.List;
/** Array that permits reuse of contained elements. */
public interface GenericArray<T> extends List<T>, GenericContainer {
/** The current content of the location where {@link #add(Object)} would next
* store an element, if any. This permits reuse of arrays and their elements
* without allocating new objects. */
T peek();
/** Reverses the order of the elements in this array. */
void reverse();
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Contains data of other types. */
public interface GenericContainer {
/** The schema of this instance. */
Schema getSchema();
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** An enum symbol. */
public interface GenericEnumSymbol
extends GenericContainer, Comparable<GenericEnumSymbol> {
/** Return the symbol. */
String toString();
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Fixed-size data. */
public interface GenericFixed extends GenericContainer {
/** Return the data. */
byte[] bytes();
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** A generic instance of a record schema. Fields are accessible by name as
* well as by index. */
public interface GenericRecord extends IndexedRecord {
/** Set the value of a field given its name. */
void put(String key, Object v);
/** Return the value of a field given its name. */
Object get(String key);
}

View File

@ -0,0 +1,259 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.GenericData.Record;
import org.apache.kafka.copycat.data.Schema.Field;
import java.io.IOException;
/** A RecordBuilder for generic records. GenericRecordBuilder fills in default values
* for fields if they are not specified. */
public class GenericRecordBuilder extends RecordBuilderBase<Record> {
private final GenericData.Record record;
/**
* Creates a GenericRecordBuilder for building Record instances.
* @param schema the schema associated with the record class.
*/
public GenericRecordBuilder(Schema schema) {
super(schema, GenericData.get());
record = new GenericData.Record(schema);
}
/**
* Creates a GenericRecordBuilder by copying an existing GenericRecordBuilder.
* @param other the GenericRecordBuilder to copy.
*/
public GenericRecordBuilder(GenericRecordBuilder other) {
super(other, GenericData.get());
record = new GenericData.Record(other.record, /* deepCopy = */ true);
}
/**
* Creates a GenericRecordBuilder by copying an existing record instance.
* @param other the record instance to copy.
*/
public GenericRecordBuilder(Record other) {
super(other.getSchema(), GenericData.get());
record = new GenericData.Record(other, /* deepCopy = */ true);
// Set all fields in the RecordBuilder that are set in the record
for (Field f : schema().getFields()) {
Object value = other.get(f.pos());
// Only set the value if it is not null, if the schema type is null,
// or if the schema type is a union that accepts nulls.
if (isValidValue(f, value)) {
set(f, data().deepCopy(f.schema(), value));
}
}
}
/**
* Gets the value of a field.
* @param fieldName the name of the field to get.
* @return the value of the field with the given name, or null if not set.
*/
public Object get(String fieldName) {
return get(schema().getField(fieldName));
}
/**
* Gets the value of a field.
* @param field the field to get.
* @return the value of the given field, or null if not set.
*/
public Object get(Field field) {
return get(field.pos());
}
/**
* Gets the value of a field.
* @param pos the position of the field to get.
* @return the value of the field with the given position, or null if not set.
*/
protected Object get(int pos) {
return record.get(pos);
}
/**
* Sets the value of a field.
* @param fieldName the name of the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder set(String fieldName, Object value) {
return set(schema().getField(fieldName), value);
}
/**
* Sets the value of a field.
* @param field the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder set(Field field, Object value) {
return set(field, field.pos(), value);
}
/**
* Sets the value of a field.
* @param pos the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
protected GenericRecordBuilder set(int pos, Object value) {
return set(fields()[pos], pos, value);
}
/**
* Sets the value of a field.
* @param field the field to set.
* @param pos the position of the field.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
private GenericRecordBuilder set(Field field, int pos, Object value) {
validate(field, value);
record.put(pos, value);
fieldSetFlags()[pos] = true;
return this;
}
/**
* Checks whether a field has been set.
* @param fieldName the name of the field to check.
* @return true if the given field is non-null; false otherwise.
*/
public boolean has(String fieldName) {
return has(schema().getField(fieldName));
}
/**
* Checks whether a field has been set.
* @param field the field to check.
* @return true if the given field is non-null; false otherwise.
*/
public boolean has(Field field) {
return has(field.pos());
}
/**
* Checks whether a field has been set.
* @param pos the position of the field to check.
* @return true if the given field is non-null; false otherwise.
*/
protected boolean has(int pos) {
return fieldSetFlags()[pos];
}
/**
* Clears the value of the given field.
* @param fieldName the name of the field to clear.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder clear(String fieldName) {
return clear(schema().getField(fieldName));
}
/**
* Clears the value of the given field.
* @param field the field to clear.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder clear(Field field) {
return clear(field.pos());
}
/**
* Clears the value of the given field.
* @param pos the position of the field to clear.
* @return a reference to the RecordBuilder.
*/
protected GenericRecordBuilder clear(int pos) {
record.put(pos, null);
fieldSetFlags()[pos] = false;
return this;
}
@Override
public Record build() {
Record record;
try {
record = new GenericData.Record(schema());
} catch (Exception e) {
throw new DataRuntimeException(e);
}
for (Field field : fields()) {
Object value;
try {
value = getWithDefault(field);
} catch (IOException e) {
throw new DataRuntimeException(e);
}
if (value != null) {
record.put(field.pos(), value);
}
}
return record;
}
/**
* Gets the value of the given field.
* If the field has been set, the set value is returned (even if it's null).
* If the field hasn't been set and has a default value, the default value
* is returned.
* @param field the field whose value should be retrieved.
* @return the value set for the given field, the field's default value,
* or null.
* @throws IOException
*/
private Object getWithDefault(Field field) throws IOException {
return fieldSetFlags()[field.pos()] ?
record.get(field.pos()) : defaultValue(field);
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((record == null) ? 0 : record.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
GenericRecordBuilder other = (GenericRecordBuilder) obj;
if (record == null) {
if (other.record != null)
return false;
} else if (!record.equals(other.record))
return false;
return true;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** A record implementation that permits field access by integer index.*/
public interface IndexedRecord extends GenericContainer {
/** Set the value of a field given its position in the schema.
* <p>This method is not meant to be called by user code, but only internally for deep copying */
void put(int i, Object v);
/** Return the value of a field given its position in the schema.
* <p>This method is not meant to be called by user code, but only internally for deep copying */
Object get(int i);
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
/**
* Base class for objects that have Object-valued properties.
*/
public abstract class ObjectProperties {
public static class Null {
private Null() {
}
}
/** A value representing a JSON <code>null</code>. */
public static final Null NULL_VALUE = new Null();
Map<String, Object> props = new LinkedHashMap<String, Object>(1);
private Set<String> reserved;
ObjectProperties(Set<String> reserved) {
this.reserved = reserved;
}
/**
* Returns the value of the named, string-valued property in this schema.
* Returns <tt>null</tt> if there is no string-valued property with that name.
*/
public String getProp(String name) {
Object value = getObjectProp(name);
return (value instanceof String) ? (String) value : null;
}
/**
* Returns the value of the named property in this schema.
* Returns <tt>null</tt> if there is no property with that name.
*/
public synchronized Object getObjectProp(String name) {
return props.get(name);
}
/**
* Adds a property with the given name <tt>name</tt> and
* value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
* <tt>null</tt>. It is illegal to add a property if another with
* the same name but different value already exists in this schema.
*
* @param name The name of the property to add
* @param value The value for the property to add
*/
public synchronized void addProp(String name, Object value) {
if (reserved.contains(name))
throw new DataRuntimeException("Can't set reserved property: " + name);
if (value == null)
throw new DataRuntimeException("Can't set a property to null: " + name);
Object old = props.get(name);
if (old == null)
props.put(name, value);
else if (!old.equals(value))
throw new DataRuntimeException("Can't overwrite property: " + name);
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Interface for record builders */
public interface RecordBuilder<T> {
/**
* Constructs a new instance using the values set in the RecordBuilder.
* If a particular value was not set and the schema defines a default
* value, the default value will be used.
* @return a new instance using values set in the RecordBuilder.
*/
T build();
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.Schema.Field;
import org.apache.kafka.copycat.data.Schema.Type;
import java.io.IOException;
import java.util.Arrays;
/** Abstract base class for RecordBuilder implementations. Not thread-safe. */
public abstract class RecordBuilderBase<T extends IndexedRecord>
implements RecordBuilder<T> {
private static final Field[] EMPTY_FIELDS = new Field[0];
private final Schema schema;
private final Field[] fields;
private final boolean[] fieldSetFlags;
private final GenericData data;
protected final Schema schema() {
return schema;
}
protected final Field[] fields() {
return fields;
}
protected final boolean[] fieldSetFlags() {
return fieldSetFlags;
}
protected final GenericData data() {
return data;
}
/**
* Creates a RecordBuilderBase for building records of the given type.
* @param schema the schema associated with the record class.
*/
protected RecordBuilderBase(Schema schema, GenericData data) {
this.schema = schema;
this.data = data;
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
fieldSetFlags = new boolean[fields.length];
}
/**
* RecordBuilderBase copy constructor.
* Makes a deep copy of the values in the other builder.
* @param other RecordBuilderBase instance to copy.
*/
protected RecordBuilderBase(RecordBuilderBase<T> other, GenericData data) {
this.schema = other.schema;
this.data = data;
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
fieldSetFlags = new boolean[other.fieldSetFlags.length];
System.arraycopy(
other.fieldSetFlags, 0, fieldSetFlags, 0, fieldSetFlags.length);
}
/**
* Validates that a particular value for a given field is valid according to
* the following algorithm:
* 1. If the value is not null, or the field type is null, or the field type
* is a union which accepts nulls, returns.
* 2. Else, if the field has a default value, returns.
* 3. Otherwise throws AvroRuntimeException.
* @param field the field to validate.
* @param value the value to validate.
* @throws NullPointerException if value is null and the given field does
* not accept null values.
*/
protected void validate(Field field, Object value) {
if (isValidValue(field, value)) {
return;
} else if (field.defaultValue() != null) {
return;
} else {
throw new DataRuntimeException(
"Field " + field + " does not accept null values");
}
}
/**
* Tests whether a value is valid for a specified field.
* @param f the field for which to test the value.
* @param value the value to test.
* @return true if the value is valid for the given field; false otherwise.
*/
protected static boolean isValidValue(Field f, Object value) {
if (value != null) {
return true;
}
Schema schema = f.schema();
Type type = schema.getType();
// If the type is null, any value is valid
if (type == Type.NULL) {
return true;
}
// If the type is a union that allows nulls, any value is valid
if (type == Type.UNION) {
for (Schema s : schema.getTypes()) {
if (s.getType() == Type.NULL) {
return true;
}
}
}
// The value is null but the type does not allow nulls
return false;
}
/**
* Gets the default value of the given field, if any.
* @param field the field whose default value should be retrieved.
* @return the default value associated with the given field,
* or null if none is specified in the schema.
* @throws IOException
*/
@SuppressWarnings({"rawtypes", "unchecked"})
protected Object defaultValue(Field field) throws IOException {
return data.deepCopy(field.schema(), data.getDefaultValue(field));
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(fieldSetFlags);
result = prime * result + ((schema == null) ? 0 : schema.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
@SuppressWarnings("rawtypes")
RecordBuilderBase other = (RecordBuilderBase) obj;
if (!Arrays.equals(fieldSetFlags, other.fieldSetFlags))
return false;
if (schema == null) {
if (other.schema != null)
return false;
} else if (!schema.equals(other.schema))
return false;
return true;
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Thrown for errors parsing schemas and protocols. */
public class SchemaParseException extends DataRuntimeException {
public SchemaParseException(Throwable cause) {
super(cause);
}
public SchemaParseException(String message) {
super(message);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Thrown when the expected contents of a union cannot be resolved. */
public class UnresolvedUnionException extends DataRuntimeException {
private Object unresolvedDatum;
private Schema unionSchema;
public UnresolvedUnionException(Schema unionSchema, Object unresolvedDatum) {
super("Not in union " + unionSchema + ": " + unresolvedDatum);
this.unionSchema = unionSchema;
this.unresolvedDatum = unresolvedDatum;
}
public Object getUnresolvedDatum() {
return unresolvedDatum;
}
public Schema getUnionSchema() {
return unionSchema;
}
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import java.nio.charset.Charset;
/** A Utf8 string. Unlike {@link String}, instances are mutable. This is more
* efficient than {@link String} when reading or writing a sequence of values,
* as a single instance may be reused. */
public class Utf8 implements Comparable<Utf8>, CharSequence {
private static final byte[] EMPTY = new byte[0];
private static final Charset UTF8 = Charset.forName("UTF-8");
private byte[] bytes = EMPTY;
private int length;
private String string;
public Utf8() {
}
public Utf8(String string) {
this.bytes = getBytesFor(string);
this.length = bytes.length;
this.string = string;
}
public Utf8(Utf8 other) {
this.length = other.length;
this.bytes = new byte[other.length];
System.arraycopy(other.bytes, 0, this.bytes, 0, this.length);
this.string = other.string;
}
public Utf8(byte[] bytes) {
this.bytes = bytes;
this.length = bytes.length;
}
/** Return UTF-8 encoded bytes.
* Only valid through {@link #getByteLength()}. */
public byte[] getBytes() {
return bytes;
}
/** Return length in bytes.
* @deprecated call {@link #getByteLength()} instead. */
public int getLength() {
return length;
}
/** Return length in bytes. */
public int getByteLength() {
return length;
}
/** Set length in bytes. Should called whenever byte content changes, even
* if the length does not change, as this also clears the cached String.
* @deprecated call {@link #setByteLength(int)} instead. */
public Utf8 setLength(int newLength) {
return setByteLength(newLength);
}
/** Set length in bytes. Should called whenever byte content changes, even
* if the length does not change, as this also clears the cached String. */
public Utf8 setByteLength(int newLength) {
if (this.bytes.length < newLength) {
byte[] newBytes = new byte[newLength];
System.arraycopy(bytes, 0, newBytes, 0, this.length);
this.bytes = newBytes;
}
this.length = newLength;
this.string = null;
return this;
}
/** Set to the contents of a String. */
public Utf8 set(String string) {
this.bytes = getBytesFor(string);
this.length = bytes.length;
this.string = string;
return this;
}
@Override
public String toString() {
if (this.length == 0) return "";
if (this.string == null) {
this.string = new String(bytes, 0, length, UTF8);
}
return this.string;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Utf8)) return false;
Utf8 that = (Utf8) o;
if (!(this.length == that.length)) return false;
byte[] thatBytes = that.bytes;
for (int i = 0; i < this.length; i++)
if (bytes[i] != thatBytes[i])
return false;
return true;
}
@Override
public int hashCode() {
int hash = 0;
for (int i = 0; i < this.length; i++)
hash = hash * 31 + bytes[i];
return hash;
}
@Override
public int compareTo(Utf8 that) {
return BinaryData.compareBytes(this.bytes, 0, this.length,
that.bytes, 0, that.length);
}
// CharSequence implementation
@Override
public char charAt(int index) {
return toString().charAt(index);
}
@Override
public int length() {
return toString().length();
}
@Override
public CharSequence subSequence(int start, int end) {
return toString().subSequence(start, end);
}
/** Gets the UTF-8 bytes for a String */
public static final byte[] getBytesFor(String str) {
return str.getBytes(UTF8);
}
}

View File

@ -0,0 +1,410 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.GenericData.Record;
import org.apache.kafka.copycat.data.Schema.Field;
import org.apache.kafka.copycat.data.Schema.Type;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.*;
import static org.junit.Assert.*;
public class TestGenericData {
@Test(expected = DataRuntimeException.class)
public void testrecordConstructorNullSchema() throws Exception {
new GenericData.Record(null);
}
@Test(expected = DataRuntimeException.class)
public void testrecordConstructorWrongSchema() throws Exception {
new GenericData.Record(Schema.create(Schema.Type.INT));
}
@Test(expected = DataRuntimeException.class)
public void testArrayConstructorNullSchema() throws Exception {
new GenericData.Array<Object>(1, null);
}
@Test(expected = DataRuntimeException.class)
public void testArrayConstructorWrongSchema() throws Exception {
new GenericData.Array<Object>(1, Schema.create(Schema.Type.INT));
}
@Test(expected = DataRuntimeException.class)
public void testRecordCreateEmptySchema() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
Record r = new GenericData.Record(s);
}
@Test(expected = DataRuntimeException.class)
public void testGetEmptySchemaFields() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
s.getFields();
}
@Test(expected = DataRuntimeException.class)
public void testGetEmptySchemaField() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
s.getField("foo");
}
@Test(expected = DataRuntimeException.class)
public void testRecordPutInvalidField() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
fields.add(new Schema.Field("someFieldName", s, "docs", null));
s.setFields(fields);
Record r = new GenericData.Record(s);
r.put("invalidFieldName", "someValue");
}
@Test
/** Make sure that even with nulls, hashCode() doesn't throw NPE. */
public void testHashCode() {
GenericData.get().hashCode(null, Schema.create(Type.NULL));
GenericData.get().hashCode(null, Schema.createUnion(
Arrays.asList(Schema.create(Type.BOOLEAN), Schema.create(Type.STRING))));
List<CharSequence> stuff = new ArrayList<CharSequence>();
stuff.add("string");
Schema schema = recordSchema();
GenericRecord r = new GenericData.Record(schema);
r.put(0, stuff);
GenericData.get().hashCode(r, schema);
}
@Test
public void testEquals() {
Schema s = recordSchema();
GenericRecord r0 = new GenericData.Record(s);
GenericRecord r1 = new GenericData.Record(s);
GenericRecord r2 = new GenericData.Record(s);
Collection<CharSequence> l0 = new ArrayDeque<CharSequence>();
List<CharSequence> l1 = new ArrayList<CharSequence>();
GenericArray<CharSequence> l2 =
new GenericData.Array<CharSequence>(1, s.getFields().get(0).schema());
String foo = "foo";
l0.add(new StringBuffer(foo));
l1.add(foo);
l2.add(new Utf8(foo));
r0.put(0, l0);
r1.put(0, l1);
r2.put(0, l2);
assertEquals(r0, r1);
assertEquals(r0, r2);
assertEquals(r1, r2);
}
private Schema recordSchema() {
List<Field> fields = new ArrayList<Field>();
fields.add(new Field("anArray", Schema.createArray(Schema.create(Type.STRING)), null, null));
Schema schema = Schema.createRecord("arrayFoo", "test", "mytest", false);
schema.setFields(fields);
return schema;
}
@Test
public void testEquals2() {
Schema schema1 = Schema.createRecord("r", null, "x", false);
List<Field> fields1 = new ArrayList<Field>();
fields1.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
Field.Order.IGNORE));
schema1.setFields(fields1);
// only differs in field order
Schema schema2 = Schema.createRecord("r", null, "x", false);
List<Field> fields2 = new ArrayList<Field>();
fields2.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
Field.Order.ASCENDING));
schema2.setFields(fields2);
GenericRecord record1 = new GenericData.Record(schema1);
record1.put("a", "1");
GenericRecord record2 = new GenericData.Record(schema2);
record2.put("a", "2");
assertFalse(record2.equals(record1));
assertFalse(record1.equals(record2));
}
@Test
public void testRecordGetFieldDoesntExist() throws Exception {
List<Field> fields = new ArrayList<Field>();
Schema schema = Schema.createRecord(fields);
GenericData.Record record = new GenericData.Record(schema);
assertNull(record.get("does not exist"));
}
@Test
public void testArrayReversal() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> forward = new GenericData.Array<Integer>(10, schema);
GenericArray<Integer> backward = new GenericData.Array<Integer>(10, schema);
for (int i = 0; i <= 9; i++) {
forward.add(i);
}
for (int i = 9; i >= 0; i--) {
backward.add(i);
}
forward.reverse();
assertTrue(forward.equals(backward));
}
@Test
public void testArrayListInterface() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(1, schema);
array.add(99);
assertEquals(new Integer(99), array.get(0));
List<Integer> list = new ArrayList<Integer>();
list.add(99);
assertEquals(array, list);
assertEquals(list, array);
assertEquals(list.hashCode(), array.hashCode());
try {
array.get(2);
fail("Expected IndexOutOfBoundsException getting index 2");
} catch (IndexOutOfBoundsException e) {
}
array.clear();
assertEquals(0, array.size());
try {
array.get(0);
fail("Expected IndexOutOfBoundsException getting index 0 after clear()");
} catch (IndexOutOfBoundsException e) {
}
}
@Test
public void testArrayAddAtLocation() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(6, schema);
array.clear();
for (int i = 0; i < 5; ++i)
array.add(i);
assertEquals(5, array.size());
array.add(0, 6);
assertEquals(new Integer(6), array.get(0));
assertEquals(6, array.size());
assertEquals(new Integer(0), array.get(1));
assertEquals(new Integer(4), array.get(5));
array.add(6, 7);
assertEquals(new Integer(7), array.get(6));
assertEquals(7, array.size());
assertEquals(new Integer(6), array.get(0));
assertEquals(new Integer(4), array.get(5));
array.add(1, 8);
assertEquals(new Integer(8), array.get(1));
assertEquals(new Integer(0), array.get(2));
assertEquals(new Integer(6), array.get(0));
assertEquals(8, array.size());
try {
array.get(9);
fail("Expected IndexOutOfBoundsException after adding elements");
} catch (IndexOutOfBoundsException e) {
}
}
@Test
public void testArrayRemove() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
array.clear();
for (int i = 0; i < 10; ++i)
array.add(i);
assertEquals(10, array.size());
assertEquals(new Integer(0), array.get(0));
assertEquals(new Integer(9), array.get(9));
array.remove(0);
assertEquals(9, array.size());
assertEquals(new Integer(1), array.get(0));
assertEquals(new Integer(2), array.get(1));
assertEquals(new Integer(9), array.get(8));
// Test boundary errors.
try {
array.get(9);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
try {
array.set(9, 99);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
try {
array.remove(9);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
// Test that we can still remove for properly sized arrays, and the rval
assertEquals(new Integer(9), array.remove(8));
assertEquals(8, array.size());
// Test insertion after remove
array.add(88);
assertEquals(new Integer(88), array.get(8));
}
@Test
public void testArraySet() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
array.clear();
for (int i = 0; i < 10; ++i)
array.add(i);
assertEquals(10, array.size());
assertEquals(new Integer(0), array.get(0));
assertEquals(new Integer(5), array.get(5));
assertEquals(new Integer(5), array.set(5, 55));
assertEquals(10, array.size());
assertEquals(new Integer(55), array.get(5));
}
@Test
public void testToStringDoesNotEscapeForwardSlash() throws Exception {
GenericData data = GenericData.get();
assertEquals("\"/\"", data.toString("/"));
}
@Test
public void testToStringNanInfinity() throws Exception {
GenericData data = GenericData.get();
assertEquals("\"Infinity\"", data.toString(Float.POSITIVE_INFINITY));
assertEquals("\"-Infinity\"", data.toString(Float.NEGATIVE_INFINITY));
assertEquals("\"NaN\"", data.toString(Float.NaN));
assertEquals("\"Infinity\"", data.toString(Double.POSITIVE_INFINITY));
assertEquals("\"-Infinity\"", data.toString(Double.NEGATIVE_INFINITY));
assertEquals("\"NaN\"", data.toString(Double.NaN));
}
@Test
public void testEnumCompare() {
Schema s = Schema.createEnum("Kind", null, null, Arrays.asList("Z", "Y", "X"));
GenericEnumSymbol z = new GenericData.EnumSymbol(s, "Z");
GenericEnumSymbol y = new GenericData.EnumSymbol(s, "Y");
assertEquals(0, z.compareTo(z));
assertTrue(y.compareTo(z) > 0);
assertTrue(z.compareTo(y) < 0);
}
@Test
public void testByteBufferDeepCopy() {
// Test that a deep copy of a byte buffer respects the byte buffer
// limits and capacity.
byte[] buffer_value = {0, 1, 2, 3, 0, 0, 0};
ByteBuffer buffer = ByteBuffer.wrap(buffer_value, 1, 4);
Schema schema = Schema.createRecord("my_record", "doc", "mytest", false);
Field byte_field =
new Field("bytes", Schema.create(Type.BYTES), null, null);
schema.setFields(Arrays.asList(byte_field));
GenericRecord record = new GenericData.Record(schema);
record.put(byte_field.name(), buffer);
GenericRecord copy = GenericData.get().deepCopy(schema, record);
ByteBuffer buffer_copy = (ByteBuffer) copy.get(byte_field.name());
assertEquals(buffer, buffer_copy);
}
@Test
public void testValidateNullableEnum() {
List<Schema> unionTypes = new ArrayList<Schema>();
Schema schema;
Schema nullSchema = Schema.create(Type.NULL);
Schema enumSchema = Schema.createEnum("AnEnum", null, null, Arrays.asList("X", "Y", "Z"));
GenericEnumSymbol w = new GenericData.EnumSymbol(enumSchema, "W");
GenericEnumSymbol x = new GenericData.EnumSymbol(enumSchema, "X");
GenericEnumSymbol y = new GenericData.EnumSymbol(enumSchema, "Y");
GenericEnumSymbol z = new GenericData.EnumSymbol(enumSchema, "Z");
// null is first
unionTypes.clear();
unionTypes.add(nullSchema);
unionTypes.add(enumSchema);
schema = Schema.createUnion(unionTypes);
assertTrue(GenericData.get().validate(schema, z));
assertTrue(GenericData.get().validate(schema, y));
assertTrue(GenericData.get().validate(schema, x));
assertFalse(GenericData.get().validate(schema, w));
assertTrue(GenericData.get().validate(schema, null));
// null is last
unionTypes.clear();
unionTypes.add(enumSchema);
unionTypes.add(nullSchema);
schema = Schema.createUnion(unionTypes);
assertTrue(GenericData.get().validate(schema, z));
assertTrue(GenericData.get().validate(schema, y));
assertTrue(GenericData.get().validate(schema, x));
assertFalse(GenericData.get().validate(schema, w));
assertTrue(GenericData.get().validate(schema, null));
}
private enum anEnum {ONE, TWO, THREE}
;
@Test
public void validateRequiresGenericSymbolForEnumSchema() {
final Schema schema = Schema.createEnum("my_enum", "doc", "namespace", Arrays.asList("ONE", "TWO", "THREE"));
final GenericData gd = GenericData.get();
/* positive cases */
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, "ONE")));
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, anEnum.ONE)));
/* negative cases */
assertFalse("We don't expect GenericData to allow a String datum for an enum schema", gd.validate(schema, "ONE"));
assertFalse("We don't expect GenericData to allow a Java Enum for an enum schema", gd.validate(schema, anEnum.ONE));
}
@Test
public void testValidateUnion() {
Schema type1Schema = SchemaBuilder.record("Type1")
.fields()
.requiredString("myString")
.requiredInt("myInt")
.endRecord();
Schema type2Schema = SchemaBuilder.record("Type2")
.fields()
.requiredString("myString")
.endRecord();
Schema unionSchema = SchemaBuilder.unionOf()
.type(type1Schema).and().type(type2Schema)
.endUnion();
GenericRecord record = new GenericData.Record(type2Schema);
record.put("myString", "myValue");
assertTrue(GenericData.get().validate(unionSchema, record));
}
}

View File

@ -14,4 +14,5 @@
// limitations under the License.
apply from: file('scala.gradle')
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients',
'log4j-appender', 'copycat-data', 'copycat-api'