mirror of https://github.com/apache/kafka.git
Add copycat-data and copycat-api
This commit is contained in:
parent
fd3a3cd595
commit
11981d2eaa
86
build.gradle
86
build.gradle
|
@ -445,3 +445,89 @@ project(':log4j-appender') {
|
|||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
}
|
||||
|
||||
project(':copycat-data') {
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "copycat-data"
|
||||
|
||||
dependencies {
|
||||
compile project(':clients')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
task testJar(type: Jar) {
|
||||
classifier = 'test'
|
||||
from sourceSets.test.output
|
||||
}
|
||||
|
||||
test {
|
||||
testLogging {
|
||||
events "passed", "skipped", "failed"
|
||||
exceptionFormat = 'full'
|
||||
}
|
||||
}
|
||||
|
||||
javadoc {
|
||||
include "**/org/apache/kafka/copycat/data/*"
|
||||
}
|
||||
|
||||
artifacts {
|
||||
archives testJar
|
||||
}
|
||||
|
||||
configurations {
|
||||
archives.extendsFrom (testCompile)
|
||||
}
|
||||
|
||||
/* FIXME
|
||||
checkstyle {
|
||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest') */
|
||||
}
|
||||
|
||||
project(':copycat-api') {
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "copycat-api"
|
||||
|
||||
dependencies {
|
||||
compile project(':copycat-data')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
task testJar(type: Jar) {
|
||||
classifier = 'test'
|
||||
from sourceSets.test.output
|
||||
}
|
||||
|
||||
test {
|
||||
testLogging {
|
||||
events "passed", "skipped", "failed"
|
||||
exceptionFormat = 'full'
|
||||
}
|
||||
}
|
||||
|
||||
javadoc {
|
||||
include "**/org/apache/kafka/copycat/api/*"
|
||||
}
|
||||
|
||||
artifacts {
|
||||
archives testJar
|
||||
}
|
||||
|
||||
configurations {
|
||||
archives.extendsFrom (testCompile)
|
||||
}
|
||||
|
||||
/* FIXME
|
||||
checkstyle {
|
||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest') */
|
||||
}
|
|
@ -106,4 +106,19 @@
|
|||
<allow pkg="org.apache.kafka" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="copycat">
|
||||
<allow pkg="org.apache.kafka.copycat.data" />
|
||||
<allow pkg="org.apache.kafka.copycat.errors" />
|
||||
|
||||
<subpackage name="source">
|
||||
<allow pkg="org.apache.kafka.copycat.connector" />
|
||||
<allow pkg="org.apache.kafka.copycat.storage" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="sink">
|
||||
<allow pkg="org.apache.kafka.copycat.connector" />
|
||||
<allow pkg="org.apache.kafka.copycat.storage" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
</import-control>
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Connectors manage integration of Copycat with another system, either as an input that ingests
|
||||
* data into Kafka or an output that passes data to an external system. Implementations should
|
||||
* not use this class directly; they should inherit from SourceConnector or SinkConnector.
|
||||
* </p>
|
||||
* <p>
|
||||
* Connectors have two primary tasks. First, given some configuration, they are responsible for
|
||||
* creating configurations for a set of {@link Task}s that split up the data processing. For
|
||||
* example, a database Connector might create Tasks by dividing the set of tables evenly among
|
||||
* tasks. Second, they are responsible for monitoring inputs for changes that require
|
||||
* reconfiguration and notifying the Copycat runtime via the ConnectorContext. Continuing the
|
||||
* previous example, the connector might periodically check for new tables and notify Copycat of
|
||||
* additions and deletions. Copycat will then request new configurations and update the running
|
||||
* Tasks.
|
||||
* </p>
|
||||
*/
|
||||
public abstract class Connector {
|
||||
|
||||
protected ConnectorContext context;
|
||||
|
||||
/**
|
||||
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
|
||||
* input configuration changes.
|
||||
* @param ctx context object used to interact with the Copycat runtime
|
||||
*/
|
||||
public void initialize(ConnectorContext ctx) {
|
||||
context = ctx;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
|
||||
* input configuration changes and using the provided set of Task configurations.
|
||||
* This version is only used to recover from failures.
|
||||
* </p>
|
||||
* <p>
|
||||
* The default implementation ignores the provided simply ignores the provided Task
|
||||
* configurations. During recovery, Copycat will request an updated set of configurations and
|
||||
* update the running Tasks appropriately. However, Connectors should implement special
|
||||
* handling of this case if it will avoid unnecessary changes to running Tasks.
|
||||
* </p>
|
||||
*
|
||||
* @param ctx context object used to interact with the Copycat runtime
|
||||
* @param taskConfigs
|
||||
*/
|
||||
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
|
||||
context = ctx;
|
||||
// Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
|
||||
// are very different, but reduces the difficulty of implementing a Connector
|
||||
}
|
||||
|
||||
/**
|
||||
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
|
||||
* either just been instantiated and initialized or {@link #stop()} has been invoked.
|
||||
*
|
||||
* @param props configuration settings
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public abstract void start(Properties props) throws CopycatException;
|
||||
|
||||
/**
|
||||
* Reconfigure this Connector. Most implementations will not override this, using the default
|
||||
* implementation that calls {@link #stop()} followed by {@link #start(Properties)}.
|
||||
* Implementations only need to override this if they want to handle this process more
|
||||
* efficiently, e.g. without shutting down network connections to the external system.
|
||||
*
|
||||
* @param props new configuration settings
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public void reconfigure(Properties props) throws CopycatException {
|
||||
stop();
|
||||
start(props);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Task implementation for this Connector.
|
||||
*/
|
||||
public abstract Class<? extends Task> getTaskClass();
|
||||
|
||||
/**
|
||||
* Returns a set of configurations for Tasks based on the current configuration,
|
||||
* producing at most count configurations.
|
||||
*
|
||||
* @param maxTasks maximum number of configurations to generate
|
||||
* @return configurations for Tasks
|
||||
*/
|
||||
public abstract List<Properties> getTaskConfigs(int maxTasks);
|
||||
|
||||
/**
|
||||
* Stop this connector.
|
||||
*
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public abstract void stop() throws CopycatException;
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
|
||||
*/
|
||||
public interface ConnectorContext {
|
||||
/**
|
||||
* Requests that the runtime reconfigure the Tasks for this source. This should be used to
|
||||
* indicate to the runtime that something about the input/output has changed (e.g. streams
|
||||
* added/removed) and the running Tasks will need to be modified.
|
||||
*/
|
||||
public void requestTaskReconfiguration();
|
||||
|
||||
/**
|
||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
||||
* to divide TopicPartitions between child tasks.
|
||||
* @param topics list of topics to get partitions for
|
||||
* @return list of all TopicPartitions for the input topics
|
||||
*/
|
||||
public abstract List<TopicPartition> getTopicPartitions(String... topics);
|
||||
|
||||
/**
|
||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
||||
* to divide TopicPartitions between child tasks.
|
||||
* @param topics list of topics to get partitions for
|
||||
* @return list of all TopicPartitions for the input topics
|
||||
*/
|
||||
public abstract List<TopicPartition> getTopicPartitions(List<String> topics);
|
||||
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
|
||||
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
|
||||
* sources and sinks (topic, partition, key, value). Although both implementations include a
|
||||
* notion of offest, it is not included here because they differ in type.
|
||||
* </p>
|
||||
* <p>
|
||||
* This class uses type parameters for keys and values. These are provided primarily for
|
||||
* connector developer convenience. Internally, Copycat will handle any primitive types or
|
||||
* org.apache.kafka.copycat.data types.
|
||||
* </p>
|
||||
*/
|
||||
public class CopycatRecord {
|
||||
private final String topic;
|
||||
private final Integer partition;
|
||||
private final Object key;
|
||||
private final Object value;
|
||||
|
||||
public CopycatRecord(String topic, Integer partition, Object value) {
|
||||
this(topic, partition, null, value);
|
||||
}
|
||||
|
||||
public CopycatRecord(String topic, Integer partition, Object key, Object value) {
|
||||
this.topic = topic;
|
||||
this.partition = partition;
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public Integer getPartition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public Object getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public Object getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CopycatRecord{" +
|
||||
"topic='" + topic + '\'' +
|
||||
", partition=" + partition +
|
||||
", key=" + key +
|
||||
", value=" + value +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CopycatRecord that = (CopycatRecord) o;
|
||||
|
||||
if (key != null ? !key.equals(that.key) : that.key != null) {
|
||||
return false;
|
||||
}
|
||||
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
|
||||
return false;
|
||||
}
|
||||
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
|
||||
return false;
|
||||
}
|
||||
if (value != null ? !value.equals(that.value) : that.value != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = topic != null ? topic.hashCode() : 0;
|
||||
result = 31 * result + (partition != null ? partition.hashCode() : 0);
|
||||
result = 31 * result + (key != null ? key.hashCode() : 0);
|
||||
result = 31 * result + (value != null ? value.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Tasks contain the code that actually copies data to/from another system. They receive
|
||||
* a configuration from their parent Connector, assigning them a fraction of a Copycat job's work.
|
||||
* The Copycat framework then pushes/pulls data from the Task. The Task must also be able to
|
||||
* respond to reconfiguration requests.
|
||||
* </p>
|
||||
* <p>
|
||||
* Task only contains the minimal shared functionality between
|
||||
* {@link org.apache.kafka.copycat.source.SourceTask} and
|
||||
* {@link org.apache.kafka.copycat.sink.SinkTask}.
|
||||
* </p>
|
||||
*/
|
||||
public interface Task {
|
||||
/**
|
||||
* Start the Task
|
||||
* @param props initial configuration
|
||||
*/
|
||||
void start(Properties props);
|
||||
|
||||
/**
|
||||
* Stop this task.
|
||||
*
|
||||
* @throws CopycatException
|
||||
*/
|
||||
void stop() throws CopycatException;
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
/**
|
||||
* A topic name and partition number
|
||||
*/
|
||||
public class TopicPartition {
|
||||
|
||||
private int hash = 0;
|
||||
private final int partition;
|
||||
private final String topic;
|
||||
|
||||
public TopicPartition(String topic, int partition) {
|
||||
this.partition = partition;
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the TopicPartition from the string representation.
|
||||
* @param topicPartition string representation of a TopicPartition
|
||||
*/
|
||||
public TopicPartition(String topicPartition) {
|
||||
int lastDash = topicPartition.lastIndexOf('-');
|
||||
if (lastDash < 0) {
|
||||
throw new IllegalArgumentException("Invalid TopicPartition format");
|
||||
}
|
||||
this.topic = topicPartition.substring(0, lastDash);
|
||||
String partitionStr = topicPartition.substring(lastDash + 1);
|
||||
this.partition = Integer.parseInt(partitionStr);
|
||||
}
|
||||
|
||||
public int partition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public String topic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (hash != 0) {
|
||||
return hash;
|
||||
}
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + partition;
|
||||
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
|
||||
this.hash = result;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TopicPartition other = (TopicPartition) obj;
|
||||
if (partition != other.partition) {
|
||||
return false;
|
||||
}
|
||||
if (topic == null) {
|
||||
if (other.topic != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!topic.equals(other.topic)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return topic + "-" + partition;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.errors;
|
||||
|
||||
/**
|
||||
* CopycatException is the top-level exception type generated by Copycat and connectors.
|
||||
*/
|
||||
public class CopycatException extends Exception {
|
||||
|
||||
public CopycatException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public CopycatException(String s, Throwable throwable) {
|
||||
super(s, throwable);
|
||||
}
|
||||
|
||||
public CopycatException(Throwable throwable) {
|
||||
super(throwable);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.errors;
|
||||
|
||||
public class CopycatRuntimeException extends RuntimeException {
|
||||
public CopycatRuntimeException() {
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(String s, Throwable throwable) {
|
||||
super(s, throwable);
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(Throwable throwable) {
|
||||
super(throwable);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
|
||||
/**
|
||||
* SinkConnectors implement the Connector interface to send Kafka data to another system.
|
||||
*/
|
||||
public abstract class SinkConnector extends Connector {
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Configuration key for the list of input topics for this connector.
|
||||
* </p>
|
||||
* <p>
|
||||
* Usually this setting is only relevant to the Copycat framework, but is provided here for
|
||||
* the convenience of Connector developers if they also need to know the set of topics.
|
||||
* </p>
|
||||
*/
|
||||
public static final String TOPICS_CONFIG = "topics";
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.copycat.connector.CopycatRecord;
|
||||
|
||||
/**
|
||||
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the offset of
|
||||
* the record in the Kafka topic-partition in addition to the standard fields. This information
|
||||
* should be used by the SinkTask to coordinate offset commits.
|
||||
*/
|
||||
public class SinkRecord extends CopycatRecord {
|
||||
private final long offset;
|
||||
|
||||
public SinkRecord(String topic, int partition, Object key, Object value, long offset) {
|
||||
super(topic, partition, key, value);
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SinkRecord that = (SinkRecord) o;
|
||||
|
||||
if (offset != that.offset) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + (int) (offset ^ (offset >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SinkRecord{" +
|
||||
"offset=" + offset +
|
||||
"} " + super.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
|
||||
* addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush}
|
||||
* to support offset commits.
|
||||
*/
|
||||
public abstract class SinkTask implements Task {
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The configuration key that provides the list of topic partitions that are inputs for this
|
||||
* SinkTask.
|
||||
* </p>
|
||||
* <p>
|
||||
* Usually this setting is only used by the Copycat framework since it manages the Kafka
|
||||
* consumer that provides input records. However, it is provided here for the convenience of
|
||||
* SinkTask implementations that may also want to know the input set of topic partitions.
|
||||
* </p>
|
||||
*/
|
||||
public static final String TOPICPARTITIONS_CONFIG = "topic.partitions";
|
||||
|
||||
protected SinkTaskContext context;
|
||||
|
||||
public void initialize(SinkTaskContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Put the records in the sink. Usually this should send the records to the sink asynchronously
|
||||
* and immediately return.
|
||||
*
|
||||
* @param records the set of records to send
|
||||
*/
|
||||
public abstract void put(Collection<SinkRecord> records) throws CopycatException;
|
||||
|
||||
/**
|
||||
* Flush all records that have been {@link #put} for the specified topic-partitions. The
|
||||
* offsets are provided for convenience, but could also be determined by tracking all offsets
|
||||
* included in the SinkRecords passed to {@link #put}.
|
||||
*
|
||||
* @param offsets mapping of TopicPartition to committed offset
|
||||
*/
|
||||
public abstract void flush(Map<TopicPartition, Long> offsets);
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
|
||||
*/
|
||||
public abstract class SinkTaskContext {
|
||||
private Map<TopicPartition, Long> offsets;
|
||||
|
||||
public SinkTaskContext() {
|
||||
offsets = new HashMap<TopicPartition, Long>();
|
||||
}
|
||||
|
||||
public void resetOffset(Map<TopicPartition, Long> offsets) {
|
||||
this.offsets = offsets;
|
||||
}
|
||||
|
||||
public Map<TopicPartition, Long> getOffsets() {
|
||||
return offsets;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.source;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
|
||||
/**
|
||||
* SourceConnectors implement the connector interface to pull data from another system and send
|
||||
* it to Kafka.
|
||||
*/
|
||||
public abstract class SourceConnector extends Connector {
|
||||
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.source;
|
||||
|
||||
import org.apache.kafka.copycat.connector.CopycatRecord;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
|
||||
* Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
|
||||
* in Kafka, they also include a stream and offset.
|
||||
* </p>
|
||||
* <p>
|
||||
* The stream represents a single input stream that the record came from (e.g. a filename, table
|
||||
* name, or topic-partition). The offset represents a position in that stream which can be used
|
||||
* to resume consumption of data.
|
||||
* </p>
|
||||
* <p>
|
||||
* These values can have arbitrary structure and should be represented using
|
||||
* org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
|
||||
* might specify the stream as a record containing { "db": "database_name", "table":
|
||||
* "table_name"} and the offset as a Long containing the timestamp of the row.
|
||||
* </p>
|
||||
*/
|
||||
public class SourceRecord extends CopycatRecord {
|
||||
private final Object stream;
|
||||
private final Object offset;
|
||||
|
||||
public SourceRecord(Object stream, Object offset, String topic, Integer partition, Object value) {
|
||||
this(stream, offset, topic, partition, null, value);
|
||||
}
|
||||
|
||||
public SourceRecord(Object stream, Object offset, String topic, Object value) {
|
||||
this(stream, offset, topic, null, null, value);
|
||||
}
|
||||
|
||||
public SourceRecord(Object stream, Object offset, String topic, Integer partition,
|
||||
Object key, Object value) {
|
||||
super(topic, partition, key, value);
|
||||
this.stream = stream;
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
public Object getStream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
public Object getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SourceRecord that = (SourceRecord) o;
|
||||
|
||||
if (offset != null ? !offset.equals(that.offset) : that.offset != null) {
|
||||
return false;
|
||||
}
|
||||
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + (stream != null ? stream.hashCode() : 0);
|
||||
result = 31 * result + (offset != null ? offset.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SourceRecord{" +
|
||||
"stream=" + stream +
|
||||
", offset=" + offset +
|
||||
"} " + super.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.source;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SourceTask is a Task that pulls records from another system for storage in Kafka.
|
||||
*/
|
||||
public abstract class SourceTask<K, V> implements Task {
|
||||
|
||||
protected SourceTaskContext context;
|
||||
|
||||
/**
|
||||
* Initialize this SourceTask with the specified context object.
|
||||
*/
|
||||
public void initialize(SourceTaskContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll this SourceTask for new records. This method should block if no data is currently
|
||||
* available.
|
||||
*
|
||||
* @return a list of source records
|
||||
*/
|
||||
public abstract List<SourceRecord> poll() throws InterruptedException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
|
||||
* method should block until the commit is complete.
|
||||
* </p>
|
||||
* <p>
|
||||
* SourceTasks are not required to implement this functionality; Copycat will record offsets
|
||||
* automatically. This hook is provided for systems that also need to store offsets internally
|
||||
* in their own system.
|
||||
* </p>
|
||||
*/
|
||||
public void commit() throws InterruptedException {
|
||||
// This space intentionally left blank.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.source;
|
||||
|
||||
import org.apache.kafka.copycat.storage.OffsetStorageReader;
|
||||
|
||||
/**
|
||||
* SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
|
||||
* runtime.
|
||||
*/
|
||||
public class SourceTaskContext {
|
||||
private final OffsetStorageReader reader;
|
||||
|
||||
public SourceTaskContext(OffsetStorageReader reader) {
|
||||
this.reader = reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the OffsetStorageReader for this SourceTask.
|
||||
*/
|
||||
public OffsetStorageReader getOffsetStorageReader() {
|
||||
return reader;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
/**
|
||||
* The Converter interface provides support for translating between Copycat's runtime data format
|
||||
* and the "native" runtime format used by the serialization layer. This is used to translate
|
||||
* two types of data: records and offsets. The (de)serialization is performed by a separate
|
||||
* component -- the producer or consumer serializer or deserializer for records or a Copycat
|
||||
* serializer or deserializer for offsets.
|
||||
*/
|
||||
public interface Converter {
|
||||
|
||||
/**
|
||||
* Convert a Copycat data object to a native object for serialization.
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
Object fromCopycatData(Object value);
|
||||
|
||||
/**
|
||||
* Convert a native object to a Copycat data object.
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
Object toCopycatData(Object value);
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Deserializer for Copycat offsets.
|
||||
* @param <T>
|
||||
*/
|
||||
public interface OffsetDeserializer<T> extends Closeable {
|
||||
|
||||
/**
|
||||
* Configure this class.
|
||||
* @param configs configs in key/value pairs
|
||||
* @param isKey whether is for key or value
|
||||
*/
|
||||
public void configure(Map<String, ?> configs, boolean isKey);
|
||||
|
||||
/**
|
||||
* Deserialize an offset key or value from the specified connector.
|
||||
* @param connector connector associated with the data
|
||||
* @param data serialized bytes
|
||||
* @return deserialized typed data
|
||||
*/
|
||||
public T deserializeOffset(String connector, byte[] data);
|
||||
|
||||
/**
|
||||
* Deserialize an offset key or value from the specified connector using a schema.
|
||||
* @param connector connector associated with the data
|
||||
* @param data serialized bytes
|
||||
* @param schema schema to deserialize to
|
||||
* @return deserialized typed data
|
||||
*/
|
||||
public T deserializeOffset(String connector, byte[] data, Schema schema);
|
||||
|
||||
@Override
|
||||
public void close();
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Serializer for Copycat offsets.
|
||||
* @param <T> native type of offsets.
|
||||
*/
|
||||
public interface OffsetSerializer<T> extends Closeable {
|
||||
/**
|
||||
* Configure this class.
|
||||
* @param configs configs in key/value pairs
|
||||
* @param isKey whether is for key or value
|
||||
*/
|
||||
public void configure(Map<String, ?> configs, boolean isKey);
|
||||
|
||||
/**
|
||||
* @param connector the connector associated with offsets
|
||||
* @param data typed data
|
||||
* @return serialized bytes
|
||||
*/
|
||||
public byte[] serializeOffset(String connector, T data);
|
||||
|
||||
/**
|
||||
* Close this serializer.
|
||||
*/
|
||||
@Override
|
||||
public void close();
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* OffsetStorageReader provides access to the offset storage used by sources. This can be used by
|
||||
* connectors to determine offsets to start consuming data from. This is most commonly used during
|
||||
* initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
|
||||
*/
|
||||
public interface OffsetStorageReader {
|
||||
/**
|
||||
* Get the offset for the specified stream. If the data isn't already available locally, this
|
||||
* gets it from the backing store, which may require some network round trips.
|
||||
*
|
||||
* @param stream object uniquely identifying the stream of data
|
||||
* @param schema schema used to decode the offset value
|
||||
* @return object uniquely identifying the offset in the stream of data
|
||||
*/
|
||||
public Object getOffset(Object stream, Schema schema);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Get a set of offsets for the specified stream identifiers. This may be more efficient
|
||||
* than calling {@link #getOffset(Object, Schema)} repeatedly.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note that when errors occur, this method omits the associated data and tries to return as
|
||||
* many of the requested values as possible. This allows a task that's managing many streams to
|
||||
* still proceed with any available data. Therefore, implementations should take care to check
|
||||
* that the data is actually available in the returned response. The only case when an
|
||||
* exception will be thrown is if the entire request failed, e.g. because the underlying
|
||||
* storage was unavailable.
|
||||
* </p>
|
||||
*
|
||||
* @param streams set of identifiers for streams of data
|
||||
* @param schema schema used to decode offset values
|
||||
* @return a map of stream identifiers to decoded offsets
|
||||
*/
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema);
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Utilities that connector implementations might find useful. Contains common building blocks
|
||||
* for writing connectors.
|
||||
*/
|
||||
public class ConnectorUtils {
|
||||
/**
|
||||
* Given a set of elements and a target number of groups, generates an set of groups of
|
||||
* elements to match the target number of groups, spreading them evenly among the groups.
|
||||
* This generates groups with contiguous elements, which results in intuitive ordering if
|
||||
* your elements are also ordered (e.g. alphabetical sets of table names if you sort
|
||||
* table names alphabetically to generate the raw partitions) or can result in efficient
|
||||
* partitioning if elements are sorted according to some criteria that affects performance
|
||||
* (e.g. topic partitions with the same leader).
|
||||
*
|
||||
* @param elements list of elements to partition
|
||||
* @param numGroups the number of output groups to generate.
|
||||
*/
|
||||
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
|
||||
if (numGroups <= 0) {
|
||||
throw new IllegalArgumentException("Number of groups must be positive.");
|
||||
}
|
||||
|
||||
List<List<T>> result = new ArrayList<List<T>>(numGroups);
|
||||
|
||||
// Each group has either n+1 or n raw partitions
|
||||
int perGroup = elements.size() / numGroups;
|
||||
int leftover = elements.size() - (numGroups * perGroup);
|
||||
|
||||
int assigned = 0;
|
||||
for (int group = 0; group < numGroups; group++) {
|
||||
int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
|
||||
List<T> groupList = new ArrayList<T>(numThisGroup);
|
||||
for (int i = 0; i < numThisGroup; i++) {
|
||||
groupList.add(elements.get(assigned));
|
||||
assigned++;
|
||||
}
|
||||
result.add(groupList);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
/**
|
||||
* General string utilities that are missing from the standard library and may commonly be
|
||||
* required by Connector or Task implementations.
|
||||
*/
|
||||
public class StringUtils {
|
||||
|
||||
/**
|
||||
* Generate a String by appending all the @{elements}, converted to Strings, delimited by
|
||||
* @{delim}.
|
||||
* @param elements list of elements to concatenate
|
||||
* @param delim delimiter to place between each element
|
||||
* @return the concatenated string with delimiters
|
||||
*/
|
||||
public static <T> String join(Iterable<T> elements, String delim) {
|
||||
StringBuilder result = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (T elem : elements) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
result.append(delim);
|
||||
}
|
||||
result.append(elem);
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ConnectorTest {
|
||||
|
||||
@Test
|
||||
public void testDefaultReconfigure() throws Exception {
|
||||
TestConnector conn = new TestConnector(false);
|
||||
conn.reconfigure(new Properties());
|
||||
assertEquals(conn.stopOrder, 0);
|
||||
assertEquals(conn.configureOrder, 1);
|
||||
}
|
||||
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testReconfigureStopException() throws Exception {
|
||||
TestConnector conn = new TestConnector(true);
|
||||
conn.reconfigure(new Properties());
|
||||
}
|
||||
|
||||
private static class TestConnector extends Connector {
|
||||
private boolean stopException;
|
||||
private int order = 0;
|
||||
public int stopOrder = -1;
|
||||
public int configureOrder = -1;
|
||||
|
||||
public TestConnector(boolean stopException) {
|
||||
this.stopException = stopException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(Properties props) throws CopycatException {
|
||||
configureOrder = order++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Task> getTaskClass() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Properties> getTaskConfigs(int count) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
stopOrder = order++;
|
||||
if (stopException) {
|
||||
throw new CopycatException("error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ConnectorUtilsTest {
|
||||
|
||||
private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5);
|
||||
|
||||
@Test
|
||||
public void testGroupPartitions() {
|
||||
|
||||
List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
|
||||
assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
|
||||
|
||||
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
|
||||
assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);
|
||||
|
||||
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
|
||||
assertEquals(Arrays.asList(Arrays.asList(1, 2),
|
||||
Arrays.asList(3, 4),
|
||||
Arrays.asList(5)), grouped);
|
||||
|
||||
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
|
||||
assertEquals(Arrays.asList(Arrays.asList(1),
|
||||
Arrays.asList(2),
|
||||
Arrays.asList(3),
|
||||
Arrays.asList(4),
|
||||
Arrays.asList(5)), grouped);
|
||||
|
||||
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
|
||||
assertEquals(Arrays.asList(Arrays.asList(1),
|
||||
Arrays.asList(2),
|
||||
Arrays.asList(3),
|
||||
Arrays.asList(4),
|
||||
Arrays.asList(5),
|
||||
Collections.EMPTY_LIST,
|
||||
Collections.EMPTY_LIST), grouped);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testGroupPartitionsInvalidCount() {
|
||||
ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class StringUtilsTest {
|
||||
|
||||
@Test
|
||||
public void testJoin() {
|
||||
assertEquals("", StringUtils.join(Collections.EMPTY_LIST, ""));
|
||||
assertEquals("", StringUtils.join(Collections.EMPTY_LIST, ","));
|
||||
|
||||
assertEquals("ab", StringUtils.join(Arrays.asList("a", "b"), ""));
|
||||
|
||||
assertEquals("a,b,c", StringUtils.join(Arrays.asList("a", "b", "c"), ","));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
|
||||
public class BinaryData {
|
||||
public static int compareBytes(byte[] b1, int s1, int l1,
|
||||
byte[] b2, int s2, int l2) {
|
||||
int end1 = s1 + l1;
|
||||
int end2 = s2 + l2;
|
||||
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
|
||||
int a = b1[i] & 0xff;
|
||||
int b = b2[j] & 0xff;
|
||||
if (a != b) {
|
||||
return a - b;
|
||||
}
|
||||
}
|
||||
return l1 - l2;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Base Avro exception. */
|
||||
public class DataRuntimeException extends RuntimeException {
|
||||
public DataRuntimeException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public DataRuntimeException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public DataRuntimeException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
|
||||
/** Thrown when an illegal type is used. */
|
||||
public class DataTypeException extends DataRuntimeException {
|
||||
public DataTypeException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public DataTypeException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/** Array that permits reuse of contained elements. */
|
||||
public interface GenericArray<T> extends List<T>, GenericContainer {
|
||||
/** The current content of the location where {@link #add(Object)} would next
|
||||
* store an element, if any. This permits reuse of arrays and their elements
|
||||
* without allocating new objects. */
|
||||
T peek();
|
||||
|
||||
/** Reverses the order of the elements in this array. */
|
||||
void reverse();
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Contains data of other types. */
|
||||
public interface GenericContainer {
|
||||
/** The schema of this instance. */
|
||||
Schema getSchema();
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** An enum symbol. */
|
||||
public interface GenericEnumSymbol
|
||||
extends GenericContainer, Comparable<GenericEnumSymbol> {
|
||||
/** Return the symbol. */
|
||||
String toString();
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Fixed-size data. */
|
||||
public interface GenericFixed extends GenericContainer {
|
||||
/** Return the data. */
|
||||
byte[] bytes();
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** A generic instance of a record schema. Fields are accessible by name as
|
||||
* well as by index. */
|
||||
public interface GenericRecord extends IndexedRecord {
|
||||
/** Set the value of a field given its name. */
|
||||
void put(String key, Object v);
|
||||
|
||||
/** Return the value of a field given its name. */
|
||||
Object get(String key);
|
||||
}
|
|
@ -0,0 +1,259 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.GenericData.Record;
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** A RecordBuilder for generic records. GenericRecordBuilder fills in default values
|
||||
* for fields if they are not specified. */
|
||||
public class GenericRecordBuilder extends RecordBuilderBase<Record> {
|
||||
private final GenericData.Record record;
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder for building Record instances.
|
||||
* @param schema the schema associated with the record class.
|
||||
*/
|
||||
public GenericRecordBuilder(Schema schema) {
|
||||
super(schema, GenericData.get());
|
||||
record = new GenericData.Record(schema);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder by copying an existing GenericRecordBuilder.
|
||||
* @param other the GenericRecordBuilder to copy.
|
||||
*/
|
||||
public GenericRecordBuilder(GenericRecordBuilder other) {
|
||||
super(other, GenericData.get());
|
||||
record = new GenericData.Record(other.record, /* deepCopy = */ true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder by copying an existing record instance.
|
||||
* @param other the record instance to copy.
|
||||
*/
|
||||
public GenericRecordBuilder(Record other) {
|
||||
super(other.getSchema(), GenericData.get());
|
||||
record = new GenericData.Record(other, /* deepCopy = */ true);
|
||||
|
||||
// Set all fields in the RecordBuilder that are set in the record
|
||||
for (Field f : schema().getFields()) {
|
||||
Object value = other.get(f.pos());
|
||||
// Only set the value if it is not null, if the schema type is null,
|
||||
// or if the schema type is a union that accepts nulls.
|
||||
if (isValidValue(f, value)) {
|
||||
set(f, data().deepCopy(f.schema(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param fieldName the name of the field to get.
|
||||
* @return the value of the field with the given name, or null if not set.
|
||||
*/
|
||||
public Object get(String fieldName) {
|
||||
return get(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param field the field to get.
|
||||
* @return the value of the given field, or null if not set.
|
||||
*/
|
||||
public Object get(Field field) {
|
||||
return get(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param pos the position of the field to get.
|
||||
* @return the value of the field with the given position, or null if not set.
|
||||
*/
|
||||
protected Object get(int pos) {
|
||||
return record.get(pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param fieldName the name of the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder set(String fieldName, Object value) {
|
||||
return set(schema().getField(fieldName), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param field the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder set(Field field, Object value) {
|
||||
return set(field, field.pos(), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param pos the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
protected GenericRecordBuilder set(int pos, Object value) {
|
||||
return set(fields()[pos], pos, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param field the field to set.
|
||||
* @param pos the position of the field.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
private GenericRecordBuilder set(Field field, int pos, Object value) {
|
||||
validate(field, value);
|
||||
record.put(pos, value);
|
||||
fieldSetFlags()[pos] = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param fieldName the name of the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
public boolean has(String fieldName) {
|
||||
return has(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param field the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
public boolean has(Field field) {
|
||||
return has(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param pos the position of the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
protected boolean has(int pos) {
|
||||
return fieldSetFlags()[pos];
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param fieldName the name of the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder clear(String fieldName) {
|
||||
return clear(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param field the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder clear(Field field) {
|
||||
return clear(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param pos the position of the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
protected GenericRecordBuilder clear(int pos) {
|
||||
record.put(pos, null);
|
||||
fieldSetFlags()[pos] = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Record build() {
|
||||
Record record;
|
||||
try {
|
||||
record = new GenericData.Record(schema());
|
||||
} catch (Exception e) {
|
||||
throw new DataRuntimeException(e);
|
||||
}
|
||||
|
||||
for (Field field : fields()) {
|
||||
Object value;
|
||||
try {
|
||||
value = getWithDefault(field);
|
||||
} catch (IOException e) {
|
||||
throw new DataRuntimeException(e);
|
||||
}
|
||||
if (value != null) {
|
||||
record.put(field.pos(), value);
|
||||
}
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of the given field.
|
||||
* If the field has been set, the set value is returned (even if it's null).
|
||||
* If the field hasn't been set and has a default value, the default value
|
||||
* is returned.
|
||||
* @param field the field whose value should be retrieved.
|
||||
* @return the value set for the given field, the field's default value,
|
||||
* or null.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Object getWithDefault(Field field) throws IOException {
|
||||
return fieldSetFlags()[field.pos()] ?
|
||||
record.get(field.pos()) : defaultValue(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + ((record == null) ? 0 : record.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (!super.equals(obj))
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
GenericRecordBuilder other = (GenericRecordBuilder) obj;
|
||||
if (record == null) {
|
||||
if (other.record != null)
|
||||
return false;
|
||||
} else if (!record.equals(other.record))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** A record implementation that permits field access by integer index.*/
|
||||
public interface IndexedRecord extends GenericContainer {
|
||||
/** Set the value of a field given its position in the schema.
|
||||
* <p>This method is not meant to be called by user code, but only internally for deep copying */
|
||||
void put(int i, Object v);
|
||||
|
||||
/** Return the value of a field given its position in the schema.
|
||||
* <p>This method is not meant to be called by user code, but only internally for deep copying */
|
||||
Object get(int i);
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Base class for objects that have Object-valued properties.
|
||||
*/
|
||||
public abstract class ObjectProperties {
|
||||
public static class Null {
|
||||
private Null() {
|
||||
}
|
||||
}
|
||||
|
||||
/** A value representing a JSON <code>null</code>. */
|
||||
public static final Null NULL_VALUE = new Null();
|
||||
|
||||
Map<String, Object> props = new LinkedHashMap<String, Object>(1);
|
||||
|
||||
private Set<String> reserved;
|
||||
|
||||
ObjectProperties(Set<String> reserved) {
|
||||
this.reserved = reserved;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the named, string-valued property in this schema.
|
||||
* Returns <tt>null</tt> if there is no string-valued property with that name.
|
||||
*/
|
||||
public String getProp(String name) {
|
||||
Object value = getObjectProp(name);
|
||||
return (value instanceof String) ? (String) value : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the named property in this schema.
|
||||
* Returns <tt>null</tt> if there is no property with that name.
|
||||
*/
|
||||
public synchronized Object getObjectProp(String name) {
|
||||
return props.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a property with the given name <tt>name</tt> and
|
||||
* value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
|
||||
* <tt>null</tt>. It is illegal to add a property if another with
|
||||
* the same name but different value already exists in this schema.
|
||||
*
|
||||
* @param name The name of the property to add
|
||||
* @param value The value for the property to add
|
||||
*/
|
||||
public synchronized void addProp(String name, Object value) {
|
||||
if (reserved.contains(name))
|
||||
throw new DataRuntimeException("Can't set reserved property: " + name);
|
||||
|
||||
if (value == null)
|
||||
throw new DataRuntimeException("Can't set a property to null: " + name);
|
||||
|
||||
Object old = props.get(name);
|
||||
if (old == null)
|
||||
props.put(name, value);
|
||||
else if (!old.equals(value))
|
||||
throw new DataRuntimeException("Can't overwrite property: " + name);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
|
||||
/** Interface for record builders */
|
||||
public interface RecordBuilder<T> {
|
||||
/**
|
||||
* Constructs a new instance using the values set in the RecordBuilder.
|
||||
* If a particular value was not set and the schema defines a default
|
||||
* value, the default value will be used.
|
||||
* @return a new instance using values set in the RecordBuilder.
|
||||
*/
|
||||
T build();
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
import org.apache.kafka.copycat.data.Schema.Type;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/** Abstract base class for RecordBuilder implementations. Not thread-safe. */
|
||||
public abstract class RecordBuilderBase<T extends IndexedRecord>
|
||||
implements RecordBuilder<T> {
|
||||
private static final Field[] EMPTY_FIELDS = new Field[0];
|
||||
private final Schema schema;
|
||||
private final Field[] fields;
|
||||
private final boolean[] fieldSetFlags;
|
||||
private final GenericData data;
|
||||
|
||||
protected final Schema schema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
protected final Field[] fields() {
|
||||
return fields;
|
||||
}
|
||||
|
||||
protected final boolean[] fieldSetFlags() {
|
||||
return fieldSetFlags;
|
||||
}
|
||||
|
||||
protected final GenericData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a RecordBuilderBase for building records of the given type.
|
||||
* @param schema the schema associated with the record class.
|
||||
*/
|
||||
protected RecordBuilderBase(Schema schema, GenericData data) {
|
||||
this.schema = schema;
|
||||
this.data = data;
|
||||
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
|
||||
fieldSetFlags = new boolean[fields.length];
|
||||
}
|
||||
|
||||
/**
|
||||
* RecordBuilderBase copy constructor.
|
||||
* Makes a deep copy of the values in the other builder.
|
||||
* @param other RecordBuilderBase instance to copy.
|
||||
*/
|
||||
protected RecordBuilderBase(RecordBuilderBase<T> other, GenericData data) {
|
||||
this.schema = other.schema;
|
||||
this.data = data;
|
||||
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
|
||||
fieldSetFlags = new boolean[other.fieldSetFlags.length];
|
||||
System.arraycopy(
|
||||
other.fieldSetFlags, 0, fieldSetFlags, 0, fieldSetFlags.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a particular value for a given field is valid according to
|
||||
* the following algorithm:
|
||||
* 1. If the value is not null, or the field type is null, or the field type
|
||||
* is a union which accepts nulls, returns.
|
||||
* 2. Else, if the field has a default value, returns.
|
||||
* 3. Otherwise throws AvroRuntimeException.
|
||||
* @param field the field to validate.
|
||||
* @param value the value to validate.
|
||||
* @throws NullPointerException if value is null and the given field does
|
||||
* not accept null values.
|
||||
*/
|
||||
protected void validate(Field field, Object value) {
|
||||
if (isValidValue(field, value)) {
|
||||
return;
|
||||
} else if (field.defaultValue() != null) {
|
||||
return;
|
||||
} else {
|
||||
throw new DataRuntimeException(
|
||||
"Field " + field + " does not accept null values");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether a value is valid for a specified field.
|
||||
* @param f the field for which to test the value.
|
||||
* @param value the value to test.
|
||||
* @return true if the value is valid for the given field; false otherwise.
|
||||
*/
|
||||
protected static boolean isValidValue(Field f, Object value) {
|
||||
if (value != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Schema schema = f.schema();
|
||||
Type type = schema.getType();
|
||||
|
||||
// If the type is null, any value is valid
|
||||
if (type == Type.NULL) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the type is a union that allows nulls, any value is valid
|
||||
if (type == Type.UNION) {
|
||||
for (Schema s : schema.getTypes()) {
|
||||
if (s.getType() == Type.NULL) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The value is null but the type does not allow nulls
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the default value of the given field, if any.
|
||||
* @param field the field whose default value should be retrieved.
|
||||
* @return the default value associated with the given field,
|
||||
* or null if none is specified in the schema.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
protected Object defaultValue(Field field) throws IOException {
|
||||
return data.deepCopy(field.schema(), data.getDefaultValue(field));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + Arrays.hashCode(fieldSetFlags);
|
||||
result = prime * result + ((schema == null) ? 0 : schema.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
@SuppressWarnings("rawtypes")
|
||||
RecordBuilderBase other = (RecordBuilderBase) obj;
|
||||
if (!Arrays.equals(fieldSetFlags, other.fieldSetFlags))
|
||||
return false;
|
||||
if (schema == null) {
|
||||
if (other.schema != null)
|
||||
return false;
|
||||
} else if (!schema.equals(other.schema))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Thrown for errors parsing schemas and protocols. */
|
||||
public class SchemaParseException extends DataRuntimeException {
|
||||
public SchemaParseException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public SchemaParseException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Thrown when the expected contents of a union cannot be resolved. */
|
||||
public class UnresolvedUnionException extends DataRuntimeException {
|
||||
private Object unresolvedDatum;
|
||||
private Schema unionSchema;
|
||||
|
||||
public UnresolvedUnionException(Schema unionSchema, Object unresolvedDatum) {
|
||||
super("Not in union " + unionSchema + ": " + unresolvedDatum);
|
||||
this.unionSchema = unionSchema;
|
||||
this.unresolvedDatum = unresolvedDatum;
|
||||
}
|
||||
|
||||
public Object getUnresolvedDatum() {
|
||||
return unresolvedDatum;
|
||||
}
|
||||
|
||||
public Schema getUnionSchema() {
|
||||
return unionSchema;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/** A Utf8 string. Unlike {@link String}, instances are mutable. This is more
|
||||
* efficient than {@link String} when reading or writing a sequence of values,
|
||||
* as a single instance may be reused. */
|
||||
public class Utf8 implements Comparable<Utf8>, CharSequence {
|
||||
private static final byte[] EMPTY = new byte[0];
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
|
||||
private byte[] bytes = EMPTY;
|
||||
private int length;
|
||||
private String string;
|
||||
|
||||
public Utf8() {
|
||||
}
|
||||
|
||||
public Utf8(String string) {
|
||||
this.bytes = getBytesFor(string);
|
||||
this.length = bytes.length;
|
||||
this.string = string;
|
||||
}
|
||||
|
||||
public Utf8(Utf8 other) {
|
||||
this.length = other.length;
|
||||
this.bytes = new byte[other.length];
|
||||
System.arraycopy(other.bytes, 0, this.bytes, 0, this.length);
|
||||
this.string = other.string;
|
||||
}
|
||||
|
||||
public Utf8(byte[] bytes) {
|
||||
this.bytes = bytes;
|
||||
this.length = bytes.length;
|
||||
}
|
||||
|
||||
/** Return UTF-8 encoded bytes.
|
||||
* Only valid through {@link #getByteLength()}. */
|
||||
public byte[] getBytes() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/** Return length in bytes.
|
||||
* @deprecated call {@link #getByteLength()} instead. */
|
||||
public int getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
/** Return length in bytes. */
|
||||
public int getByteLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
/** Set length in bytes. Should called whenever byte content changes, even
|
||||
* if the length does not change, as this also clears the cached String.
|
||||
* @deprecated call {@link #setByteLength(int)} instead. */
|
||||
public Utf8 setLength(int newLength) {
|
||||
return setByteLength(newLength);
|
||||
}
|
||||
|
||||
/** Set length in bytes. Should called whenever byte content changes, even
|
||||
* if the length does not change, as this also clears the cached String. */
|
||||
public Utf8 setByteLength(int newLength) {
|
||||
if (this.bytes.length < newLength) {
|
||||
byte[] newBytes = new byte[newLength];
|
||||
System.arraycopy(bytes, 0, newBytes, 0, this.length);
|
||||
this.bytes = newBytes;
|
||||
}
|
||||
this.length = newLength;
|
||||
this.string = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Set to the contents of a String. */
|
||||
public Utf8 set(String string) {
|
||||
this.bytes = getBytesFor(string);
|
||||
this.length = bytes.length;
|
||||
this.string = string;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (this.length == 0) return "";
|
||||
if (this.string == null) {
|
||||
this.string = new String(bytes, 0, length, UTF8);
|
||||
}
|
||||
return this.string;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == this) return true;
|
||||
if (!(o instanceof Utf8)) return false;
|
||||
Utf8 that = (Utf8) o;
|
||||
if (!(this.length == that.length)) return false;
|
||||
byte[] thatBytes = that.bytes;
|
||||
for (int i = 0; i < this.length; i++)
|
||||
if (bytes[i] != thatBytes[i])
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int hash = 0;
|
||||
for (int i = 0; i < this.length; i++)
|
||||
hash = hash * 31 + bytes[i];
|
||||
return hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Utf8 that) {
|
||||
return BinaryData.compareBytes(this.bytes, 0, this.length,
|
||||
that.bytes, 0, that.length);
|
||||
}
|
||||
|
||||
// CharSequence implementation
|
||||
@Override
|
||||
public char charAt(int index) {
|
||||
return toString().charAt(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int length() {
|
||||
return toString().length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CharSequence subSequence(int start, int end) {
|
||||
return toString().subSequence(start, end);
|
||||
}
|
||||
|
||||
/** Gets the UTF-8 bytes for a String */
|
||||
public static final byte[] getBytesFor(String str) {
|
||||
return str.getBytes(UTF8);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,410 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.GenericData.Record;
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
import org.apache.kafka.copycat.data.Schema.Type;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestGenericData {
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testrecordConstructorNullSchema() throws Exception {
|
||||
new GenericData.Record(null);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testrecordConstructorWrongSchema() throws Exception {
|
||||
new GenericData.Record(Schema.create(Schema.Type.INT));
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testArrayConstructorNullSchema() throws Exception {
|
||||
new GenericData.Array<Object>(1, null);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testArrayConstructorWrongSchema() throws Exception {
|
||||
new GenericData.Array<Object>(1, Schema.create(Schema.Type.INT));
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testRecordCreateEmptySchema() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
Record r = new GenericData.Record(s);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testGetEmptySchemaFields() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
s.getFields();
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testGetEmptySchemaField() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
s.getField("foo");
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testRecordPutInvalidField() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
List<Schema.Field> fields = new ArrayList<Schema.Field>();
|
||||
fields.add(new Schema.Field("someFieldName", s, "docs", null));
|
||||
s.setFields(fields);
|
||||
Record r = new GenericData.Record(s);
|
||||
r.put("invalidFieldName", "someValue");
|
||||
}
|
||||
|
||||
@Test
|
||||
/** Make sure that even with nulls, hashCode() doesn't throw NPE. */
|
||||
public void testHashCode() {
|
||||
GenericData.get().hashCode(null, Schema.create(Type.NULL));
|
||||
GenericData.get().hashCode(null, Schema.createUnion(
|
||||
Arrays.asList(Schema.create(Type.BOOLEAN), Schema.create(Type.STRING))));
|
||||
List<CharSequence> stuff = new ArrayList<CharSequence>();
|
||||
stuff.add("string");
|
||||
Schema schema = recordSchema();
|
||||
GenericRecord r = new GenericData.Record(schema);
|
||||
r.put(0, stuff);
|
||||
GenericData.get().hashCode(r, schema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals() {
|
||||
Schema s = recordSchema();
|
||||
GenericRecord r0 = new GenericData.Record(s);
|
||||
GenericRecord r1 = new GenericData.Record(s);
|
||||
GenericRecord r2 = new GenericData.Record(s);
|
||||
Collection<CharSequence> l0 = new ArrayDeque<CharSequence>();
|
||||
List<CharSequence> l1 = new ArrayList<CharSequence>();
|
||||
GenericArray<CharSequence> l2 =
|
||||
new GenericData.Array<CharSequence>(1, s.getFields().get(0).schema());
|
||||
String foo = "foo";
|
||||
l0.add(new StringBuffer(foo));
|
||||
l1.add(foo);
|
||||
l2.add(new Utf8(foo));
|
||||
r0.put(0, l0);
|
||||
r1.put(0, l1);
|
||||
r2.put(0, l2);
|
||||
assertEquals(r0, r1);
|
||||
assertEquals(r0, r2);
|
||||
assertEquals(r1, r2);
|
||||
}
|
||||
|
||||
private Schema recordSchema() {
|
||||
List<Field> fields = new ArrayList<Field>();
|
||||
fields.add(new Field("anArray", Schema.createArray(Schema.create(Type.STRING)), null, null));
|
||||
Schema schema = Schema.createRecord("arrayFoo", "test", "mytest", false);
|
||||
schema.setFields(fields);
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals2() {
|
||||
Schema schema1 = Schema.createRecord("r", null, "x", false);
|
||||
List<Field> fields1 = new ArrayList<Field>();
|
||||
fields1.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
|
||||
Field.Order.IGNORE));
|
||||
schema1.setFields(fields1);
|
||||
|
||||
// only differs in field order
|
||||
Schema schema2 = Schema.createRecord("r", null, "x", false);
|
||||
List<Field> fields2 = new ArrayList<Field>();
|
||||
fields2.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
|
||||
Field.Order.ASCENDING));
|
||||
schema2.setFields(fields2);
|
||||
|
||||
GenericRecord record1 = new GenericData.Record(schema1);
|
||||
record1.put("a", "1");
|
||||
|
||||
GenericRecord record2 = new GenericData.Record(schema2);
|
||||
record2.put("a", "2");
|
||||
|
||||
assertFalse(record2.equals(record1));
|
||||
assertFalse(record1.equals(record2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordGetFieldDoesntExist() throws Exception {
|
||||
List<Field> fields = new ArrayList<Field>();
|
||||
Schema schema = Schema.createRecord(fields);
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
assertNull(record.get("does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayReversal() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> forward = new GenericData.Array<Integer>(10, schema);
|
||||
GenericArray<Integer> backward = new GenericData.Array<Integer>(10, schema);
|
||||
for (int i = 0; i <= 9; i++) {
|
||||
forward.add(i);
|
||||
}
|
||||
for (int i = 9; i >= 0; i--) {
|
||||
backward.add(i);
|
||||
}
|
||||
forward.reverse();
|
||||
assertTrue(forward.equals(backward));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayListInterface() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(1, schema);
|
||||
array.add(99);
|
||||
assertEquals(new Integer(99), array.get(0));
|
||||
List<Integer> list = new ArrayList<Integer>();
|
||||
list.add(99);
|
||||
assertEquals(array, list);
|
||||
assertEquals(list, array);
|
||||
assertEquals(list.hashCode(), array.hashCode());
|
||||
try {
|
||||
array.get(2);
|
||||
fail("Expected IndexOutOfBoundsException getting index 2");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
array.clear();
|
||||
assertEquals(0, array.size());
|
||||
try {
|
||||
array.get(0);
|
||||
fail("Expected IndexOutOfBoundsException getting index 0 after clear()");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayAddAtLocation() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(6, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 5; ++i)
|
||||
array.add(i);
|
||||
assertEquals(5, array.size());
|
||||
array.add(0, 6);
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(6, array.size());
|
||||
assertEquals(new Integer(0), array.get(1));
|
||||
assertEquals(new Integer(4), array.get(5));
|
||||
array.add(6, 7);
|
||||
assertEquals(new Integer(7), array.get(6));
|
||||
assertEquals(7, array.size());
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(new Integer(4), array.get(5));
|
||||
array.add(1, 8);
|
||||
assertEquals(new Integer(8), array.get(1));
|
||||
assertEquals(new Integer(0), array.get(2));
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(8, array.size());
|
||||
try {
|
||||
array.get(9);
|
||||
fail("Expected IndexOutOfBoundsException after adding elements");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayRemove() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 10; ++i)
|
||||
array.add(i);
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(0), array.get(0));
|
||||
assertEquals(new Integer(9), array.get(9));
|
||||
|
||||
array.remove(0);
|
||||
assertEquals(9, array.size());
|
||||
assertEquals(new Integer(1), array.get(0));
|
||||
assertEquals(new Integer(2), array.get(1));
|
||||
assertEquals(new Integer(9), array.get(8));
|
||||
|
||||
// Test boundary errors.
|
||||
try {
|
||||
array.get(9);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
try {
|
||||
array.set(9, 99);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
try {
|
||||
array.remove(9);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
|
||||
// Test that we can still remove for properly sized arrays, and the rval
|
||||
assertEquals(new Integer(9), array.remove(8));
|
||||
assertEquals(8, array.size());
|
||||
|
||||
|
||||
// Test insertion after remove
|
||||
array.add(88);
|
||||
assertEquals(new Integer(88), array.get(8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArraySet() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 10; ++i)
|
||||
array.add(i);
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(0), array.get(0));
|
||||
assertEquals(new Integer(5), array.get(5));
|
||||
|
||||
assertEquals(new Integer(5), array.set(5, 55));
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(55), array.get(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToStringDoesNotEscapeForwardSlash() throws Exception {
|
||||
GenericData data = GenericData.get();
|
||||
assertEquals("\"/\"", data.toString("/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToStringNanInfinity() throws Exception {
|
||||
GenericData data = GenericData.get();
|
||||
assertEquals("\"Infinity\"", data.toString(Float.POSITIVE_INFINITY));
|
||||
assertEquals("\"-Infinity\"", data.toString(Float.NEGATIVE_INFINITY));
|
||||
assertEquals("\"NaN\"", data.toString(Float.NaN));
|
||||
assertEquals("\"Infinity\"", data.toString(Double.POSITIVE_INFINITY));
|
||||
assertEquals("\"-Infinity\"", data.toString(Double.NEGATIVE_INFINITY));
|
||||
assertEquals("\"NaN\"", data.toString(Double.NaN));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnumCompare() {
|
||||
Schema s = Schema.createEnum("Kind", null, null, Arrays.asList("Z", "Y", "X"));
|
||||
GenericEnumSymbol z = new GenericData.EnumSymbol(s, "Z");
|
||||
GenericEnumSymbol y = new GenericData.EnumSymbol(s, "Y");
|
||||
assertEquals(0, z.compareTo(z));
|
||||
assertTrue(y.compareTo(z) > 0);
|
||||
assertTrue(z.compareTo(y) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteBufferDeepCopy() {
|
||||
// Test that a deep copy of a byte buffer respects the byte buffer
|
||||
// limits and capacity.
|
||||
byte[] buffer_value = {0, 1, 2, 3, 0, 0, 0};
|
||||
ByteBuffer buffer = ByteBuffer.wrap(buffer_value, 1, 4);
|
||||
Schema schema = Schema.createRecord("my_record", "doc", "mytest", false);
|
||||
Field byte_field =
|
||||
new Field("bytes", Schema.create(Type.BYTES), null, null);
|
||||
schema.setFields(Arrays.asList(byte_field));
|
||||
|
||||
GenericRecord record = new GenericData.Record(schema);
|
||||
record.put(byte_field.name(), buffer);
|
||||
|
||||
GenericRecord copy = GenericData.get().deepCopy(schema, record);
|
||||
ByteBuffer buffer_copy = (ByteBuffer) copy.get(byte_field.name());
|
||||
|
||||
assertEquals(buffer, buffer_copy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateNullableEnum() {
|
||||
List<Schema> unionTypes = new ArrayList<Schema>();
|
||||
Schema schema;
|
||||
Schema nullSchema = Schema.create(Type.NULL);
|
||||
Schema enumSchema = Schema.createEnum("AnEnum", null, null, Arrays.asList("X", "Y", "Z"));
|
||||
GenericEnumSymbol w = new GenericData.EnumSymbol(enumSchema, "W");
|
||||
GenericEnumSymbol x = new GenericData.EnumSymbol(enumSchema, "X");
|
||||
GenericEnumSymbol y = new GenericData.EnumSymbol(enumSchema, "Y");
|
||||
GenericEnumSymbol z = new GenericData.EnumSymbol(enumSchema, "Z");
|
||||
|
||||
// null is first
|
||||
unionTypes.clear();
|
||||
unionTypes.add(nullSchema);
|
||||
unionTypes.add(enumSchema);
|
||||
schema = Schema.createUnion(unionTypes);
|
||||
|
||||
assertTrue(GenericData.get().validate(schema, z));
|
||||
assertTrue(GenericData.get().validate(schema, y));
|
||||
assertTrue(GenericData.get().validate(schema, x));
|
||||
assertFalse(GenericData.get().validate(schema, w));
|
||||
assertTrue(GenericData.get().validate(schema, null));
|
||||
|
||||
// null is last
|
||||
unionTypes.clear();
|
||||
unionTypes.add(enumSchema);
|
||||
unionTypes.add(nullSchema);
|
||||
schema = Schema.createUnion(unionTypes);
|
||||
|
||||
assertTrue(GenericData.get().validate(schema, z));
|
||||
assertTrue(GenericData.get().validate(schema, y));
|
||||
assertTrue(GenericData.get().validate(schema, x));
|
||||
assertFalse(GenericData.get().validate(schema, w));
|
||||
assertTrue(GenericData.get().validate(schema, null));
|
||||
}
|
||||
|
||||
private enum anEnum {ONE, TWO, THREE}
|
||||
|
||||
;
|
||||
|
||||
@Test
|
||||
public void validateRequiresGenericSymbolForEnumSchema() {
|
||||
final Schema schema = Schema.createEnum("my_enum", "doc", "namespace", Arrays.asList("ONE", "TWO", "THREE"));
|
||||
final GenericData gd = GenericData.get();
|
||||
|
||||
/* positive cases */
|
||||
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, "ONE")));
|
||||
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, anEnum.ONE)));
|
||||
|
||||
/* negative cases */
|
||||
assertFalse("We don't expect GenericData to allow a String datum for an enum schema", gd.validate(schema, "ONE"));
|
||||
assertFalse("We don't expect GenericData to allow a Java Enum for an enum schema", gd.validate(schema, anEnum.ONE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateUnion() {
|
||||
Schema type1Schema = SchemaBuilder.record("Type1")
|
||||
.fields()
|
||||
.requiredString("myString")
|
||||
.requiredInt("myInt")
|
||||
.endRecord();
|
||||
|
||||
Schema type2Schema = SchemaBuilder.record("Type2")
|
||||
.fields()
|
||||
.requiredString("myString")
|
||||
.endRecord();
|
||||
|
||||
Schema unionSchema = SchemaBuilder.unionOf()
|
||||
.type(type1Schema).and().type(type2Schema)
|
||||
.endUnion();
|
||||
|
||||
GenericRecord record = new GenericData.Record(type2Schema);
|
||||
record.put("myString", "myValue");
|
||||
assertTrue(GenericData.get().validate(unionSchema, record));
|
||||
}
|
||||
}
|
|
@ -14,4 +14,5 @@
|
|||
// limitations under the License.
|
||||
|
||||
apply from: file('scala.gradle')
|
||||
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
|
||||
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients',
|
||||
'log4j-appender', 'copycat-data', 'copycat-api'
|
||||
|
|
Loading…
Reference in New Issue