Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-30 22:12:31 -07:00
parent 4674d136e1
commit 6ba87debad
23 changed files with 254 additions and 2788 deletions

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
public class BinaryData {
/**
* Compares regions of two byte arrays, returning a negative integer, zero, or positive integer when the first byte
* array region is less than, equal to, or greater than the second byte array region, respectively.
*
* @param b1 first byte array
* @param s1 start of region in first byte array
* @param l1 length of region in first byte array
* @param b2 second byte array
* @param s2 start of region in second byte array
* @param l2 length of region in second byte array
* @return a negative integer, zero, or a positive integer as the first byte array is less than, equal to, or greater
* than the second byte array
*/
public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int end1 = s1 + l1;
int end2 = s2 + l2;
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
int a = b1[i] & 0xff;
int b = b2[j] & 0xff;
if (a != b)
return a - b;
}
return l1 - l2;
}
}

View File

@ -1,34 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import java.util.List;
/** Array that permits reuse of contained elements. */
public interface GenericArray<T> extends List<T>, GenericContainer {
/** The current content of the location where {@link #add(Object)} would next
* store an element, if any. This permits reuse of arrays and their elements
* without allocating new objects. */
T peek();
/** Reverses the order of the elements in this array. */
void reverse();
}

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Contains data of other types. */
public interface GenericContainer {
/** The schema of this instance. */
Schema getSchema();
}

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** An enum symbol. */
public interface GenericEnumSymbol
extends GenericContainer, Comparable<GenericEnumSymbol> {
/** Return the symbol. */
String toString();
}

View File

@ -1,26 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Fixed-size data. */
public interface GenericFixed extends GenericContainer {
/** Return the data. */
byte[] bytes();
}

View File

@ -1,30 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** A generic instance of a record schema. Fields are accessible by name as
* well as by index. */
public interface GenericRecord extends IndexedRecord {
/** Set the value of a field given its name. */
void put(String key, Object v);
/** Return the value of a field given its name. */
Object get(String key);
}

View File

@ -1,259 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.GenericData.Record;
import org.apache.kafka.copycat.data.Schema.Field;
import java.io.IOException;
/** A RecordBuilder for generic records. GenericRecordBuilder fills in default values
* for fields if they are not specified. */
public class GenericRecordBuilder extends RecordBuilderBase<Record> {
private final GenericData.Record record;
/**
* Creates a GenericRecordBuilder for building Record instances.
* @param schema the schema associated with the record class.
*/
public GenericRecordBuilder(Schema schema) {
super(schema, GenericData.get());
record = new GenericData.Record(schema);
}
/**
* Creates a GenericRecordBuilder by copying an existing GenericRecordBuilder.
* @param other the GenericRecordBuilder to copy.
*/
public GenericRecordBuilder(GenericRecordBuilder other) {
super(other, GenericData.get());
record = new GenericData.Record(other.record, /* deepCopy = */ true);
}
/**
* Creates a GenericRecordBuilder by copying an existing record instance.
* @param other the record instance to copy.
*/
public GenericRecordBuilder(Record other) {
super(other.getSchema(), GenericData.get());
record = new GenericData.Record(other, /* deepCopy = */ true);
// Set all fields in the RecordBuilder that are set in the record
for (Field f : schema().getFields()) {
Object value = other.get(f.pos());
// Only set the value if it is not null, if the schema type is null,
// or if the schema type is a union that accepts nulls.
if (isValidValue(f, value)) {
set(f, data().deepCopy(f.schema(), value));
}
}
}
/**
* Gets the value of a field.
* @param fieldName the name of the field to get.
* @return the value of the field with the given name, or null if not set.
*/
public Object get(String fieldName) {
return get(schema().getField(fieldName));
}
/**
* Gets the value of a field.
* @param field the field to get.
* @return the value of the given field, or null if not set.
*/
public Object get(Field field) {
return get(field.pos());
}
/**
* Gets the value of a field.
* @param pos the position of the field to get.
* @return the value of the field with the given position, or null if not set.
*/
protected Object get(int pos) {
return record.get(pos);
}
/**
* Sets the value of a field.
* @param fieldName the name of the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder set(String fieldName, Object value) {
return set(schema().getField(fieldName), value);
}
/**
* Sets the value of a field.
* @param field the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder set(Field field, Object value) {
return set(field, field.pos(), value);
}
/**
* Sets the value of a field.
* @param pos the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
protected GenericRecordBuilder set(int pos, Object value) {
return set(fields()[pos], pos, value);
}
/**
* Sets the value of a field.
* @param field the field to set.
* @param pos the position of the field.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
private GenericRecordBuilder set(Field field, int pos, Object value) {
validate(field, value);
record.put(pos, value);
fieldSetFlags()[pos] = true;
return this;
}
/**
* Checks whether a field has been set.
* @param fieldName the name of the field to check.
* @return true if the given field is non-null; false otherwise.
*/
public boolean has(String fieldName) {
return has(schema().getField(fieldName));
}
/**
* Checks whether a field has been set.
* @param field the field to check.
* @return true if the given field is non-null; false otherwise.
*/
public boolean has(Field field) {
return has(field.pos());
}
/**
* Checks whether a field has been set.
* @param pos the position of the field to check.
* @return true if the given field is non-null; false otherwise.
*/
protected boolean has(int pos) {
return fieldSetFlags()[pos];
}
/**
* Clears the value of the given field.
* @param fieldName the name of the field to clear.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder clear(String fieldName) {
return clear(schema().getField(fieldName));
}
/**
* Clears the value of the given field.
* @param field the field to clear.
* @return a reference to the RecordBuilder.
*/
public GenericRecordBuilder clear(Field field) {
return clear(field.pos());
}
/**
* Clears the value of the given field.
* @param pos the position of the field to clear.
* @return a reference to the RecordBuilder.
*/
protected GenericRecordBuilder clear(int pos) {
record.put(pos, null);
fieldSetFlags()[pos] = false;
return this;
}
@Override
public Record build() {
Record record;
try {
record = new GenericData.Record(schema());
} catch (Exception e) {
throw new DataRuntimeException(e);
}
for (Field field : fields()) {
Object value;
try {
value = getWithDefault(field);
} catch (IOException e) {
throw new DataRuntimeException(e);
}
if (value != null) {
record.put(field.pos(), value);
}
}
return record;
}
/**
* Gets the value of the given field.
* If the field has been set, the set value is returned (even if it's null).
* If the field hasn't been set and has a default value, the default value
* is returned.
* @param field the field whose value should be retrieved.
* @return the value set for the given field, the field's default value,
* or null.
* @throws IOException
*/
private Object getWithDefault(Field field) throws IOException {
return fieldSetFlags()[field.pos()] ?
record.get(field.pos()) : defaultValue(field);
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((record == null) ? 0 : record.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
GenericRecordBuilder other = (GenericRecordBuilder) obj;
if (record == null) {
if (other.record != null)
return false;
} else if (!record.equals(other.record))
return false;
return true;
}
}

View File

@ -1,31 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** A record implementation that permits field access by integer index.*/
public interface IndexedRecord extends GenericContainer {
/** Set the value of a field given its position in the schema.
* <p>This method is not meant to be called by user code, but only internally for deep copying */
void put(int i, Object v);
/** Return the value of a field given its position in the schema.
* <p>This method is not meant to be called by user code, but only internally for deep copying */
Object get(int i);
}

View File

@ -1,32 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Interface for record builders */
public interface RecordBuilder<T> {
/**
* Constructs a new instance using the values set in the RecordBuilder.
* If a particular value was not set and the schema defines a default
* value, the default value will be used.
* @return a new instance using values set in the RecordBuilder.
*/
T build();
}

View File

@ -1,173 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.Schema.Field;
import org.apache.kafka.copycat.data.Schema.Type;
import java.io.IOException;
import java.util.Arrays;
/** Abstract base class for RecordBuilder implementations. Not thread-safe. */
public abstract class RecordBuilderBase<T extends IndexedRecord>
implements RecordBuilder<T> {
private static final Field[] EMPTY_FIELDS = new Field[0];
private final Schema schema;
private final Field[] fields;
private final boolean[] fieldSetFlags;
private final GenericData data;
protected final Schema schema() {
return schema;
}
protected final Field[] fields() {
return fields;
}
protected final boolean[] fieldSetFlags() {
return fieldSetFlags;
}
protected final GenericData data() {
return data;
}
/**
* Creates a RecordBuilderBase for building records of the given type.
* @param schema the schema associated with the record class.
*/
protected RecordBuilderBase(Schema schema, GenericData data) {
this.schema = schema;
this.data = data;
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
fieldSetFlags = new boolean[fields.length];
}
/**
* RecordBuilderBase copy constructor.
* Makes a deep copy of the values in the other builder.
* @param other RecordBuilderBase instance to copy.
*/
protected RecordBuilderBase(RecordBuilderBase<T> other, GenericData data) {
this.schema = other.schema;
this.data = data;
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
fieldSetFlags = new boolean[other.fieldSetFlags.length];
System.arraycopy(
other.fieldSetFlags, 0, fieldSetFlags, 0, fieldSetFlags.length);
}
/**
* Validates that a particular value for a given field is valid according to
* the following algorithm:
* 1. If the value is not null, or the field type is null, or the field type
* is a union which accepts nulls, returns.
* 2. Else, if the field has a default value, returns.
* 3. Otherwise throws AvroRuntimeException.
* @param field the field to validate.
* @param value the value to validate.
* @throws NullPointerException if value is null and the given field does
* not accept null values.
*/
protected void validate(Field field, Object value) {
if (isValidValue(field, value)) {
return;
} else if (field.defaultValue() != null) {
return;
} else {
throw new DataRuntimeException(
"Field " + field + " does not accept null values");
}
}
/**
* Tests whether a value is valid for a specified field.
* @param f the field for which to test the value.
* @param value the value to test.
* @return true if the value is valid for the given field; false otherwise.
*/
protected static boolean isValidValue(Field f, Object value) {
if (value != null) {
return true;
}
Schema schema = f.schema();
Type type = schema.getType();
// If the type is null, any value is valid
if (type == Type.NULL) {
return true;
}
// If the type is a union that allows nulls, any value is valid
if (type == Type.UNION) {
for (Schema s : schema.getTypes()) {
if (s.getType() == Type.NULL) {
return true;
}
}
}
// The value is null but the type does not allow nulls
return false;
}
/**
* Gets the default value of the given field, if any.
* @param field the field whose default value should be retrieved.
* @return the default value associated with the given field,
* or null if none is specified in the schema.
* @throws IOException
*/
@SuppressWarnings({"rawtypes", "unchecked"})
protected Object defaultValue(Field field) throws IOException {
return data.deepCopy(field.schema(), data.getDefaultValue(field));
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(fieldSetFlags);
result = prime * result + ((schema == null) ? 0 : schema.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
@SuppressWarnings("rawtypes")
RecordBuilderBase other = (RecordBuilderBase) obj;
if (!Arrays.equals(fieldSetFlags, other.fieldSetFlags))
return false;
if (schema == null) {
if (other.schema != null)
return false;
} else if (!schema.equals(other.schema))
return false;
return true;
}
}

View File

@ -60,12 +60,6 @@ public abstract class Schema extends ObjectProperties {
/** The type of a schema. */
public enum Type {
RECORD {
@Override
public Object defaultValue(Schema schema) {
return new GenericRecordBuilder(schema).build();
}
},
ENUM {
@Override
public Object defaultValue(Schema schema) {
@ -75,7 +69,7 @@ public abstract class Schema extends ObjectProperties {
ARRAY {
@Override
public Object defaultValue(Schema schema) {
return new GenericData.Array(0, schema);
return new ArrayList<>();
}
},
MAP {
@ -91,12 +85,6 @@ public abstract class Schema extends ObjectProperties {
return firstSchema.getType().defaultValue(firstSchema);
}
},
FIXED {
@Override
public Object defaultValue(Schema schema) {
return new GenericData.Fixed(schema);
}
},
STRING {
@Override
public Object defaultValue(Schema schema) {
@ -205,19 +193,6 @@ public abstract class Schema extends ObjectProperties {
hashCode = NO_HASHCODE;
}
/** Create an anonymous record schema. */
public static Schema createRecord(List<Field> fields) {
Schema result = createRecord(null, null, null, false);
result.setFields(fields);
return result;
}
/** Create a named record schema. */
public static Schema createRecord(String name, String doc, String namespace,
boolean isError) {
return new RecordSchema(new Name(name, namespace), doc, isError);
}
/** Create an enum schema. */
public static Schema createEnum(String name, String doc, String namespace,
List<String> values) {
@ -245,12 +220,6 @@ public abstract class Schema extends ObjectProperties {
return createUnion(new LockableArrayList<Schema>(types));
}
/** Create a union schema. */
public static Schema createFixed(String name, String doc, String space,
int size) {
return new FixedSchema(new Name(name, space), doc, size);
}
/** Return the type of this schema. */
public Type getType() {
return type;
@ -655,92 +624,6 @@ public abstract class Schema extends ObjectProperties {
}
};
@SuppressWarnings(value = "unchecked")
private static class RecordSchema extends NamedSchema {
private List<Field> fields;
private Map<String, Field> fieldMap;
private final boolean isError;
public RecordSchema(Name name, String doc, boolean isError) {
super(Type.RECORD, name, doc);
this.isError = isError;
}
public boolean isError() {
return isError;
}
@Override
public Field getField(String fieldname) {
if (fieldMap == null)
throw new DataRuntimeException("Schema fields not set yet");
return fieldMap.get(fieldname);
}
@Override
public List<Field> getFields() {
if (fields == null)
throw new DataRuntimeException("Schema fields not set yet");
return fields;
}
@Override
public void setFields(List<Field> fields) {
if (this.fields != null) {
throw new DataRuntimeException("Fields are already set");
}
int i = 0;
fieldMap = new HashMap<String, Field>();
LockableArrayList ff = new LockableArrayList();
for (Field f : fields) {
if (f.position != -1)
throw new DataRuntimeException("Field already used: " + f);
f.position = i++;
final Field existingField = fieldMap.put(f.name(), f);
if (existingField != null) {
throw new DataRuntimeException(String.format(
"Duplicate field %s in record %s: %s and %s.",
f.name(), name, f, existingField));
}
ff.add(f);
}
this.fields = ff.lock();
this.hashCode = NO_HASHCODE;
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof RecordSchema)) return false;
RecordSchema that = (RecordSchema) o;
if (!equalCachedHash(that)) return false;
if (!equalNames(that)) return false;
if (!props.equals(that.props)) return false;
Set seen = SEEN_EQUALS.get();
SeenPair here = new SeenPair(this, o);
if (seen.contains(here)) return true; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.add(here);
return fields.equals(((RecordSchema) o).fields);
} finally {
if (first) seen.clear();
}
}
@Override
int computeHash() {
Map seen = SEEN_HASHCODE.get();
if (seen.containsKey(this)) return 0; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.put(this, this);
return super.computeHash() + fields.hashCode();
} finally {
if (first) seen.clear();
}
}
}
private static class EnumSchema extends NamedSchema {
private final List<String> symbols;
private final Map<String, Integer> ordinals;
@ -884,36 +767,6 @@ public abstract class Schema extends ObjectProperties {
}
}
private static class FixedSchema extends NamedSchema {
private final int size;
public FixedSchema(Name name, String doc, int size) {
super(Type.FIXED, name, doc);
if (size < 0)
throw new IllegalArgumentException("Invalid fixed size: " + size);
this.size = size;
}
public int getFixedSize() {
return size;
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof FixedSchema)) return false;
FixedSchema that = (FixedSchema) o;
return equalCachedHash(that)
&& equalNames(that)
&& size == that.size
&& props.equals(that.props);
}
@Override
int computeHash() {
return super.computeHash() + size;
}
}
private static class StringSchema extends Schema {
public StringSchema() {
super(Type.STRING);
@ -1075,8 +928,6 @@ public abstract class Schema extends ObjectProperties {
case ENUM:
return (defaultValue instanceof String);
case BYTES:
case FIXED:
return (defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer);
case INT:
return (defaultValue instanceof Integer);
case LONG:
@ -1105,13 +956,6 @@ public abstract class Schema extends ObjectProperties {
return true;
case UNION: // union default: first branch
return isValidDefault(schema.getTypes().get(0), defaultValue);
case RECORD:
if (!(defaultValue instanceof GenericData.Record))
return false;
for (Field field : schema.getFields())
if (!isValidDefault(field.schema(), ((GenericData.Record) defaultValue).get(field.name())))
return false;
return true;
default:
return false;
}

View File

@ -224,18 +224,6 @@ public class SchemaBuilder {
new NameContext().namespace(namespace));
}
/**
* Create a builder for an Avro record with the specified name.
* This is equivalent to:
* <pre>
* builder().record(name);
* </pre>
* @param name the record name
*/
public static RecordBuilder<Schema> record(String name) {
return builder().record(name);
}
/**
* Create a builder for an Avro enum with the specified name and symbols (values).
* This is equivalent to:
@ -248,18 +236,6 @@ public class SchemaBuilder {
return builder().enumeration(name);
}
/**
* Create a builder for an Avro fixed type with the specified name and size.
* This is equivalent to:
* <pre>
* builder().fixed(name);
* </pre>
* @param name the fixed name
*/
public static FixedBuilder<Schema> fixed(String name) {
return builder().fixed(name);
}
/**
* Create a builder for an Avro array
* This is equivalent to:
@ -691,41 +667,6 @@ public class SchemaBuilder {
}
}
/**
* Builds an Avro Fixed type with optional properties, namespace, doc, and
* aliases.
* <p/>
* Set properties with {@link #prop(String, Object)}, namespace with
* {@link #namespace(String)}, doc with {@link #doc(String)}, and aliases with
* {@link #aliases(String[])}.
* <p/>
* The Fixed schema is finalized when its required size is set via
* {@link #size(int)}.
**/
public static final class FixedBuilder<R> extends
NamespacedBuilder<R, FixedBuilder<R>> {
private FixedBuilder(Completion<R> context, NameContext names, String name) {
super(context, names, name);
}
private static <R> FixedBuilder<R> create(Completion<R> context,
NameContext names, String name) {
return new FixedBuilder<R>(context, names, name);
}
@Override
protected FixedBuilder<R> self() {
return this;
}
/** Configure this fixed type's size, and end its configuration. **/
public R size(int size) {
Schema schema = Schema.createFixed(name(), super.doc(), space(), size);
completeSchema(schema);
return context().complete(schema);
}
}
/**
* Builds an Avro Enum type with optional properties, namespace, doc, and
* aliases.
@ -1162,19 +1103,6 @@ public class SchemaBuilder {
return ArrayBuilder.create(context, names);
}
/** Build an Avro fixed type. Example usage:
* <pre>
* fixed("com.foo.IPv4").size(4)
* </pre>
* Equivalent to Avro JSON Schema:
* <pre>
* {"type":"fixed", "name":"com.foo.IPv4", "size":4}
* </pre>
**/
public final FixedBuilder<R> fixed(String name) {
return FixedBuilder.create(context, names, name);
}
/** Build an Avro enum type. Example usage:
* <pre>
* enumeration("Suits").namespace("org.cards").doc("card suit names")
@ -1191,29 +1119,6 @@ public class SchemaBuilder {
return EnumBuilder.create(context, names, name);
}
/** Build an Avro record type. Example usage:
* <pre>
* record("com.foo.Foo").fields()
* .name("field1").typeInt().intDefault(1)
* .name("field2").typeString().noDefault()
* .name("field3").optional().typeFixed("FooFixed").size(4)
* .endRecord()
* </pre>
* Equivalent to Avro JSON Schema:
* <pre>
* {"type":"record", "name":"com.foo.Foo", "fields": [
* {"name":"field1", "type":"int", "default":1},
* {"name":"field2", "type":"string"},
* {"name":"field3", "type":[
* null, {"type":"fixed", "name":"FooFixed", "size":4}
* ]}
* ]}
* </pre>
**/
public final RecordBuilder<R> record(String name) {
return RecordBuilder.create(context, names, name);
}
/** Build an Avro union schema type. Example usage:
* <pre>unionOf().stringType().and().bytesType().endUnion()</pre>
**/
@ -1450,21 +1355,11 @@ public class SchemaBuilder {
return ArrayBuilder.create(wrap(new ArrayDefault<R>(bldr)), names);
}
/** Build an Avro fixed type. **/
public final FixedBuilder<FixedDefault<R>> fixed(String name) {
return FixedBuilder.create(wrap(new FixedDefault<R>(bldr)), names, name);
}
/** Build an Avro enum type. **/
public final EnumBuilder<EnumDefault<R>> enumeration(String name) {
return EnumBuilder.create(wrap(new EnumDefault<R>(bldr)), names, name);
}
/** Build an Avro record type. **/
public final RecordBuilder<RecordDefault<R>> record(String name) {
return RecordBuilder.create(wrap(new RecordDefault<R>(bldr)), names, name);
}
private <C> Completion<C> wrap(
Completion<C> completion) {
if (wrapper != null) {
@ -1677,51 +1572,16 @@ public class SchemaBuilder {
return ArrayBuilder.create(completion(new ArrayDefault<R>(bldr)), names);
}
/** Build an Avro fixed type. **/
public FixedBuilder<UnionAccumulator<FixedDefault<R>>> fixed(String name) {
return FixedBuilder.create(completion(new FixedDefault<R>(bldr)), names, name);
}
/** Build an Avro enum type. **/
public EnumBuilder<UnionAccumulator<EnumDefault<R>>> enumeration(String name) {
return EnumBuilder.create(completion(new EnumDefault<R>(bldr)), names, name);
}
/** Build an Avro record type. **/
public RecordBuilder<UnionAccumulator<RecordDefault<R>>> record(String name) {
return RecordBuilder.create(completion(new RecordDefault<R>(bldr)), names, name);
}
private <C> UnionCompletion<C> completion(Completion<C> context) {
return new UnionCompletion<C>(context, names, new ArrayList<Schema>());
}
}
public final static class RecordBuilder<R> extends
NamespacedBuilder<R, RecordBuilder<R>> {
private RecordBuilder(Completion<R> context, NameContext names, String name) {
super(context, names, name);
}
private static <R> RecordBuilder<R> create(Completion<R> context,
NameContext names, String name) {
return new RecordBuilder<R>(context, names, name);
}
@Override
protected RecordBuilder<R> self() {
return this;
}
public FieldAssembler<R> fields() {
Schema record = Schema.createRecord(name(), doc(), space(), false);
// place the record in the name context, fields yet to be set.
completeSchema(record);
return new FieldAssembler<R>(
context(), names().namespace(record.getNamespace()), record);
}
}
public final static class FieldAssembler<R> {
private final List<Field> fields = new ArrayList<Field>();
private final Completion<R> context;
@ -2374,11 +2234,6 @@ public class SchemaBuilder {
super(field);
}
/** Completes this field with the default value provided, cannot be null **/
public final FieldAssembler<R> recordDefault(GenericRecord defaultVal) {
return super.usingDefault(defaultVal);
}
@Override
final RecordDefault<R> self() {
return this;

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
/** Thrown when the expected contents of a union cannot be resolved. */
public class UnresolvedUnionException extends DataRuntimeException {
private Object unresolvedDatum;
private Schema unionSchema;
public UnresolvedUnionException(Schema unionSchema, Object unresolvedDatum) {
super("Not in union " + unionSchema + ": " + unresolvedDatum);
this.unionSchema = unionSchema;
this.unresolvedDatum = unresolvedDatum;
}
public Object getUnresolvedDatum() {
return unresolvedDatum;
}
public Schema getUnionSchema() {
return unionSchema;
}
}

View File

@ -1,158 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import java.nio.charset.Charset;
/** A Utf8 string. Unlike {@link String}, instances are mutable. This is more
* efficient than {@link String} when reading or writing a sequence of values,
* as a single instance may be reused. */
public class Utf8 implements Comparable<Utf8>, CharSequence {
private static final byte[] EMPTY = new byte[0];
private static final Charset UTF8 = Charset.forName("UTF-8");
private byte[] bytes = EMPTY;
private int length;
private String string;
public Utf8() {
}
public Utf8(String string) {
this.bytes = getBytesFor(string);
this.length = bytes.length;
this.string = string;
}
public Utf8(Utf8 other) {
this.length = other.length;
this.bytes = new byte[other.length];
System.arraycopy(other.bytes, 0, this.bytes, 0, this.length);
this.string = other.string;
}
public Utf8(byte[] bytes) {
this.bytes = bytes;
this.length = bytes.length;
}
/** Return UTF-8 encoded bytes.
* Only valid through {@link #getByteLength()}. */
public byte[] getBytes() {
return bytes;
}
/** Return length in bytes.
* @deprecated call {@link #getByteLength()} instead. */
public int getLength() {
return length;
}
/** Return length in bytes. */
public int getByteLength() {
return length;
}
/** Set length in bytes. Should called whenever byte content changes, even
* if the length does not change, as this also clears the cached String.
* @deprecated call {@link #setByteLength(int)} instead. */
public Utf8 setLength(int newLength) {
return setByteLength(newLength);
}
/** Set length in bytes. Should called whenever byte content changes, even
* if the length does not change, as this also clears the cached String. */
public Utf8 setByteLength(int newLength) {
if (this.bytes.length < newLength) {
byte[] newBytes = new byte[newLength];
System.arraycopy(bytes, 0, newBytes, 0, this.length);
this.bytes = newBytes;
}
this.length = newLength;
this.string = null;
return this;
}
/** Set to the contents of a String. */
public Utf8 set(String string) {
this.bytes = getBytesFor(string);
this.length = bytes.length;
this.string = string;
return this;
}
@Override
public String toString() {
if (this.length == 0) return "";
if (this.string == null) {
this.string = new String(bytes, 0, length, UTF8);
}
return this.string;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Utf8)) return false;
Utf8 that = (Utf8) o;
if (!(this.length == that.length)) return false;
byte[] thatBytes = that.bytes;
for (int i = 0; i < this.length; i++)
if (bytes[i] != thatBytes[i])
return false;
return true;
}
@Override
public int hashCode() {
int hash = 0;
for (int i = 0; i < this.length; i++)
hash = hash * 31 + bytes[i];
return hash;
}
@Override
public int compareTo(Utf8 that) {
return BinaryData.compareBytes(this.bytes, 0, this.length,
that.bytes, 0, that.length);
}
// CharSequence implementation
@Override
public char charAt(int index) {
return toString().charAt(index);
}
@Override
public int length() {
return toString().length();
}
@Override
public CharSequence subSequence(int start, int end) {
return toString().subSequence(start, end);
}
/** Gets the UTF-8 bytes for a String */
public static final byte[] getBytesFor(String str) {
return str.getBytes(UTF8);
}
}

View File

@ -1,410 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.data;
import org.apache.kafka.copycat.data.GenericData.Record;
import org.apache.kafka.copycat.data.Schema.Field;
import org.apache.kafka.copycat.data.Schema.Type;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.*;
import static org.junit.Assert.*;
public class TestGenericData {
@Test(expected = DataRuntimeException.class)
public void testrecordConstructorNullSchema() throws Exception {
new GenericData.Record(null);
}
@Test(expected = DataRuntimeException.class)
public void testrecordConstructorWrongSchema() throws Exception {
new GenericData.Record(Schema.create(Schema.Type.INT));
}
@Test(expected = DataRuntimeException.class)
public void testArrayConstructorNullSchema() throws Exception {
new GenericData.Array<Object>(1, null);
}
@Test(expected = DataRuntimeException.class)
public void testArrayConstructorWrongSchema() throws Exception {
new GenericData.Array<Object>(1, Schema.create(Schema.Type.INT));
}
@Test(expected = DataRuntimeException.class)
public void testRecordCreateEmptySchema() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
Record r = new GenericData.Record(s);
}
@Test(expected = DataRuntimeException.class)
public void testGetEmptySchemaFields() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
s.getFields();
}
@Test(expected = DataRuntimeException.class)
public void testGetEmptySchemaField() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
s.getField("foo");
}
@Test(expected = DataRuntimeException.class)
public void testRecordPutInvalidField() throws Exception {
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
fields.add(new Schema.Field("someFieldName", s, "docs", null));
s.setFields(fields);
Record r = new GenericData.Record(s);
r.put("invalidFieldName", "someValue");
}
@Test
/** Make sure that even with nulls, hashCode() doesn't throw NPE. */
public void testHashCode() {
GenericData.get().hashCode(null, Schema.create(Type.NULL));
GenericData.get().hashCode(null, Schema.createUnion(
Arrays.asList(Schema.create(Type.BOOLEAN), Schema.create(Type.STRING))));
List<CharSequence> stuff = new ArrayList<CharSequence>();
stuff.add("string");
Schema schema = recordSchema();
GenericRecord r = new GenericData.Record(schema);
r.put(0, stuff);
GenericData.get().hashCode(r, schema);
}
@Test
public void testEquals() {
Schema s = recordSchema();
GenericRecord r0 = new GenericData.Record(s);
GenericRecord r1 = new GenericData.Record(s);
GenericRecord r2 = new GenericData.Record(s);
Collection<CharSequence> l0 = new ArrayDeque<CharSequence>();
List<CharSequence> l1 = new ArrayList<CharSequence>();
GenericArray<CharSequence> l2 =
new GenericData.Array<CharSequence>(1, s.getFields().get(0).schema());
String foo = "foo";
l0.add(new StringBuffer(foo));
l1.add(foo);
l2.add(new Utf8(foo));
r0.put(0, l0);
r1.put(0, l1);
r2.put(0, l2);
assertEquals(r0, r1);
assertEquals(r0, r2);
assertEquals(r1, r2);
}
private Schema recordSchema() {
List<Field> fields = new ArrayList<Field>();
fields.add(new Field("anArray", Schema.createArray(Schema.create(Type.STRING)), null, null));
Schema schema = Schema.createRecord("arrayFoo", "test", "mytest", false);
schema.setFields(fields);
return schema;
}
@Test
public void testEquals2() {
Schema schema1 = Schema.createRecord("r", null, "x", false);
List<Field> fields1 = new ArrayList<Field>();
fields1.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
Field.Order.IGNORE));
schema1.setFields(fields1);
// only differs in field order
Schema schema2 = Schema.createRecord("r", null, "x", false);
List<Field> fields2 = new ArrayList<Field>();
fields2.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
Field.Order.ASCENDING));
schema2.setFields(fields2);
GenericRecord record1 = new GenericData.Record(schema1);
record1.put("a", "1");
GenericRecord record2 = new GenericData.Record(schema2);
record2.put("a", "2");
assertFalse(record2.equals(record1));
assertFalse(record1.equals(record2));
}
@Test
public void testRecordGetFieldDoesntExist() throws Exception {
List<Field> fields = new ArrayList<Field>();
Schema schema = Schema.createRecord(fields);
GenericData.Record record = new GenericData.Record(schema);
assertNull(record.get("does not exist"));
}
@Test
public void testArrayReversal() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> forward = new GenericData.Array<Integer>(10, schema);
GenericArray<Integer> backward = new GenericData.Array<Integer>(10, schema);
for (int i = 0; i <= 9; i++) {
forward.add(i);
}
for (int i = 9; i >= 0; i--) {
backward.add(i);
}
forward.reverse();
assertTrue(forward.equals(backward));
}
@Test
public void testArrayListInterface() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(1, schema);
array.add(99);
assertEquals(new Integer(99), array.get(0));
List<Integer> list = new ArrayList<Integer>();
list.add(99);
assertEquals(array, list);
assertEquals(list, array);
assertEquals(list.hashCode(), array.hashCode());
try {
array.get(2);
fail("Expected IndexOutOfBoundsException getting index 2");
} catch (IndexOutOfBoundsException e) {
}
array.clear();
assertEquals(0, array.size());
try {
array.get(0);
fail("Expected IndexOutOfBoundsException getting index 0 after clear()");
} catch (IndexOutOfBoundsException e) {
}
}
@Test
public void testArrayAddAtLocation() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(6, schema);
array.clear();
for (int i = 0; i < 5; ++i)
array.add(i);
assertEquals(5, array.size());
array.add(0, 6);
assertEquals(new Integer(6), array.get(0));
assertEquals(6, array.size());
assertEquals(new Integer(0), array.get(1));
assertEquals(new Integer(4), array.get(5));
array.add(6, 7);
assertEquals(new Integer(7), array.get(6));
assertEquals(7, array.size());
assertEquals(new Integer(6), array.get(0));
assertEquals(new Integer(4), array.get(5));
array.add(1, 8);
assertEquals(new Integer(8), array.get(1));
assertEquals(new Integer(0), array.get(2));
assertEquals(new Integer(6), array.get(0));
assertEquals(8, array.size());
try {
array.get(9);
fail("Expected IndexOutOfBoundsException after adding elements");
} catch (IndexOutOfBoundsException e) {
}
}
@Test
public void testArrayRemove() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
array.clear();
for (int i = 0; i < 10; ++i)
array.add(i);
assertEquals(10, array.size());
assertEquals(new Integer(0), array.get(0));
assertEquals(new Integer(9), array.get(9));
array.remove(0);
assertEquals(9, array.size());
assertEquals(new Integer(1), array.get(0));
assertEquals(new Integer(2), array.get(1));
assertEquals(new Integer(9), array.get(8));
// Test boundary errors.
try {
array.get(9);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
try {
array.set(9, 99);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
try {
array.remove(9);
fail("Expected IndexOutOfBoundsException after removing an element");
} catch (IndexOutOfBoundsException e) {
}
// Test that we can still remove for properly sized arrays, and the rval
assertEquals(new Integer(9), array.remove(8));
assertEquals(8, array.size());
// Test insertion after remove
array.add(88);
assertEquals(new Integer(88), array.get(8));
}
@Test
public void testArraySet() {
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
array.clear();
for (int i = 0; i < 10; ++i)
array.add(i);
assertEquals(10, array.size());
assertEquals(new Integer(0), array.get(0));
assertEquals(new Integer(5), array.get(5));
assertEquals(new Integer(5), array.set(5, 55));
assertEquals(10, array.size());
assertEquals(new Integer(55), array.get(5));
}
@Test
public void testToStringDoesNotEscapeForwardSlash() throws Exception {
GenericData data = GenericData.get();
assertEquals("\"/\"", data.toString("/"));
}
@Test
public void testToStringNanInfinity() throws Exception {
GenericData data = GenericData.get();
assertEquals("\"Infinity\"", data.toString(Float.POSITIVE_INFINITY));
assertEquals("\"-Infinity\"", data.toString(Float.NEGATIVE_INFINITY));
assertEquals("\"NaN\"", data.toString(Float.NaN));
assertEquals("\"Infinity\"", data.toString(Double.POSITIVE_INFINITY));
assertEquals("\"-Infinity\"", data.toString(Double.NEGATIVE_INFINITY));
assertEquals("\"NaN\"", data.toString(Double.NaN));
}
@Test
public void testEnumCompare() {
Schema s = Schema.createEnum("Kind", null, null, Arrays.asList("Z", "Y", "X"));
GenericEnumSymbol z = new GenericData.EnumSymbol(s, "Z");
GenericEnumSymbol y = new GenericData.EnumSymbol(s, "Y");
assertEquals(0, z.compareTo(z));
assertTrue(y.compareTo(z) > 0);
assertTrue(z.compareTo(y) < 0);
}
@Test
public void testByteBufferDeepCopy() {
// Test that a deep copy of a byte buffer respects the byte buffer
// limits and capacity.
byte[] buffer_value = {0, 1, 2, 3, 0, 0, 0};
ByteBuffer buffer = ByteBuffer.wrap(buffer_value, 1, 4);
Schema schema = Schema.createRecord("my_record", "doc", "mytest", false);
Field byte_field =
new Field("bytes", Schema.create(Type.BYTES), null, null);
schema.setFields(Arrays.asList(byte_field));
GenericRecord record = new GenericData.Record(schema);
record.put(byte_field.name(), buffer);
GenericRecord copy = GenericData.get().deepCopy(schema, record);
ByteBuffer buffer_copy = (ByteBuffer) copy.get(byte_field.name());
assertEquals(buffer, buffer_copy);
}
@Test
public void testValidateNullableEnum() {
List<Schema> unionTypes = new ArrayList<Schema>();
Schema schema;
Schema nullSchema = Schema.create(Type.NULL);
Schema enumSchema = Schema.createEnum("AnEnum", null, null, Arrays.asList("X", "Y", "Z"));
GenericEnumSymbol w = new GenericData.EnumSymbol(enumSchema, "W");
GenericEnumSymbol x = new GenericData.EnumSymbol(enumSchema, "X");
GenericEnumSymbol y = new GenericData.EnumSymbol(enumSchema, "Y");
GenericEnumSymbol z = new GenericData.EnumSymbol(enumSchema, "Z");
// null is first
unionTypes.clear();
unionTypes.add(nullSchema);
unionTypes.add(enumSchema);
schema = Schema.createUnion(unionTypes);
assertTrue(GenericData.get().validate(schema, z));
assertTrue(GenericData.get().validate(schema, y));
assertTrue(GenericData.get().validate(schema, x));
assertFalse(GenericData.get().validate(schema, w));
assertTrue(GenericData.get().validate(schema, null));
// null is last
unionTypes.clear();
unionTypes.add(enumSchema);
unionTypes.add(nullSchema);
schema = Schema.createUnion(unionTypes);
assertTrue(GenericData.get().validate(schema, z));
assertTrue(GenericData.get().validate(schema, y));
assertTrue(GenericData.get().validate(schema, x));
assertFalse(GenericData.get().validate(schema, w));
assertTrue(GenericData.get().validate(schema, null));
}
private enum anEnum {ONE, TWO, THREE}
;
@Test
public void validateRequiresGenericSymbolForEnumSchema() {
final Schema schema = Schema.createEnum("my_enum", "doc", "namespace", Arrays.asList("ONE", "TWO", "THREE"));
final GenericData gd = GenericData.get();
/* positive cases */
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, "ONE")));
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, anEnum.ONE)));
/* negative cases */
assertFalse("We don't expect GenericData to allow a String datum for an enum schema", gd.validate(schema, "ONE"));
assertFalse("We don't expect GenericData to allow a Java Enum for an enum schema", gd.validate(schema, anEnum.ONE));
}
@Test
public void testValidateUnion() {
Schema type1Schema = SchemaBuilder.record("Type1")
.fields()
.requiredString("myString")
.requiredInt("myInt")
.endRecord();
Schema type2Schema = SchemaBuilder.record("Type2")
.fields()
.requiredString("myString")
.endRecord();
Schema unionSchema = SchemaBuilder.unionOf()
.type(type1Schema).and().type(type2Schema)
.endUnion();
GenericRecord record = new GenericData.Record(type2Schema);
record.put("myString", "myValue");
assertTrue(GenericData.get().validate(unionSchema, record));
}
}

View File

@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<module name="Checker">
<property name="localeLanguage" value="en"/>
<module name="FileTabCharacter"/>
<!-- header -->
<module name="RegexpHeader">
<property name="header" value="/\*\*\nLicensed to the Apache.*"/>
</module>
<module name="TreeWalker">
<!-- code cleanup -->
<module name="UnusedImports"/>
<module name="RedundantImport"/>
<module name="IllegalImport" />
<module name="EqualsHashCode"/>
<module name="SimplifyBooleanExpression"/>
<module name="OneStatementPerLine"/>
<module name="UnnecessaryParentheses" />
<module name="SimplifyBooleanReturn"/>
<!-- style -->
<module name="DefaultComesLast"/>
<module name="EmptyStatement"/>
<module name="ArrayTypeStyle"/>
<module name="UpperEll"/>
<module name="LeftCurly"/>
<module name="RightCurly"/>
<module name="EmptyStatement"/>
<module name="ConstantName">
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
</module>
<module name="LocalVariableName"/>
<module name="LocalFinalVariableName"/>
<module name="ClassTypeParameterName"/>
<module name="MemberName"/>
<module name="MethodTypeParameterName"/>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName"/>
<module name="TypeName"/>
<!-- dependencies -->
<module name="ImportControl">
<property name="file" value="${basedir}/checkstyle/import-control.xml"/>
</module>
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="WhitespaceAfter" />
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
</module>
<module name="Indentation"/>
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
</module>
</module>

View File

@ -0,0 +1,168 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.utils" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="config">
<allow pkg="org.apache.kafka.common.config" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="network">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.protocol.types" />
</subpackage>
<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
</subpackage>
<subpackage name="requests">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.network" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
<subpackage name="serialization">
<allow class="org.apache.kafka.common.errors.SerializationException" />
</subpackage>
</subpackage>
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
<subpackage name="producer">
<allow pkg="org.apache.kafka.clients.producer" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>
<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="test">
<allow pkg="org.apache.kafka" />
</subpackage>
<subpackage name="copycat">
<allow pkg="org.apache.kafka.copycat.data" />
<allow pkg="org.apache.kafka.copycat.errors" />
<subpackage name="source">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="sink">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="runtime">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
<subpackage name="cli">
<allow pkg="org.apache.kafka.copycat.runtime" />
<allow pkg="org.apache.kafka.copycat.util" />
<allow pkg="org.apache.kafka.common" />
</subpackage>
<subpackage name="storage">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common.serialization" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
<subpackage name="util">
<allow pkg="org.apache.kafka.copycat" />
</subpackage>
<subpackage name="json">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="file">
<allow pkg="org.apache.kafka.copycat" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
</subpackage>
</import-control>

View File

@ -84,33 +84,6 @@ public class JsonConverter implements Converter {
return value.textValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.OBJECT_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
HashMap<String, JsonNode> jsonSchemaFieldsByName = new HashMap<>();
for (JsonNode fieldSchema : jsonSchemaFields) {
JsonNode name = fieldSchema.get("name");
if (name == null || !name.isTextual())
throw new CopycatRuntimeException("Invalid field name");
jsonSchemaFieldsByName.put(name.textValue(), fieldSchema);
}
Schema schema = asCopycatSchema(jsonSchema);
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
// TODO: We should verify both the schema fields and actual fields are exactly identical
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIt.next();
JsonNode fieldSchema = jsonSchemaFieldsByName.get(entry.getKey());
builder.set(entry.getKey(), convertToCopycat(fieldSchema, entry.getValue()));
}
return builder.build();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
@ -165,14 +138,6 @@ public class JsonConverter implements Converter {
throw new UnsupportedOperationException("null schema not supported");
case STRING:
return JsonSchema.STRING_SCHEMA;
case RECORD: {
ObjectNode recordSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
ArrayNode fields = recordSchema.putArray(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
for (Schema.Field field : schema.getFields()) {
fields.add(JsonNodeFactory.instance.objectNode().set(field.name(), asJsonSchema(field.schema())));
}
return recordSchema;
}
case UNION: {
throw new UnsupportedOperationException("union schema not supported");
}
@ -181,8 +146,6 @@ public class JsonConverter implements Converter {
.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType()));
case ENUM:
throw new UnsupportedOperationException("enum schema not supported");
case FIXED:
throw new UnsupportedOperationException("fixed schema not supported");
case MAP:
throw new UnsupportedOperationException("map schema not supported");
default:
@ -221,25 +184,6 @@ public class JsonConverter implements Converter {
if (elemSchema == null)
throw new CopycatRuntimeException("Array schema did not specify the element type");
return Schema.createArray(asCopycatSchema(elemSchema));
case JsonSchema.OBJECT_TYPE_NAME:
JsonNode jsonSchemaName = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
if (jsonSchemaName == null || !jsonSchemaName.isTextual())
throw new CopycatRuntimeException("Invalid object schema, should contain name.");
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
List<Schema.Field> fields = new ArrayList<>();
// TODO: We should verify both the schema fields and actual fields are exactly identical
for (JsonNode fieldJsonSchema : jsonSchemaFields) {
JsonNode fieldName = fieldJsonSchema.get(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME);
if (fieldName == null || !fieldName.isTextual())
throw new CopycatRuntimeException("Object field missing name");
// TODO: doc, default value?
fields.add(new Schema.Field(fieldName.textValue(), asCopycatSchema(fieldJsonSchema), null, null));
}
Schema result = Schema.createRecord(jsonSchemaName.textValue(), null, null, false);
result.setFields(fields);
return result;
default:
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue());
}
@ -283,25 +227,6 @@ public class JsonConverter implements Converter {
return JsonSchema.bytesEnvelope(((ByteBuffer) value).array());
} else if (value instanceof CharSequence) {
return JsonSchema.stringEnvelope(value.toString());
} else if (value instanceof GenericRecord) {
GenericRecord recordValue = (GenericRecord) value;
ObjectNode schema = JsonNodeFactory.instance.objectNode();
schema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
schema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, recordValue.getSchema().getName());
ArrayNode schemaFields = JsonNodeFactory.instance.arrayNode();
schema.set(JsonSchema.OBJECT_FIELDS_FIELD_NAME, schemaFields);
ObjectNode record = JsonNodeFactory.instance.objectNode();
for (Schema.Field field : recordValue.getSchema().getFields()) {
JsonSchema.Envelope fieldSchemaAndValue = convertToJson(recordValue.get(field.name()));
// Fill in the field name since this is part of the field schema spec but the call to convertToJson that
// created it does not have access to the field name. This *must* copy the schema since it may be one of
// the primitive schemas.
ObjectNode fieldSchema = ((ObjectNode) fieldSchemaAndValue.schema).deepCopy();
fieldSchema.put(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME, field.name());
schemaFields.add(fieldSchema);
record.set(field.name(), fieldSchemaAndValue.payload);
}
return new JsonSchema.Envelope(schema, record);
} else if (value instanceof Collection) {
Collection collection = (Collection) value;
ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);

View File

@ -29,8 +29,6 @@ public class JsonSchema {
static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
static final String SCHEMA_TYPE_FIELD_NAME = "type";
static final String SCHEMA_NAME_FIELD_NAME = "name";
static final String OBJECT_FIELD_NAME_FIELD_NAME = "name";
static final String OBJECT_FIELDS_FIELD_NAME = "fields";
static final String ARRAY_ITEMS_FIELD_NAME = "items";
static final String BOOLEAN_TYPE_NAME = "boolean";
static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
@ -47,7 +45,6 @@ public class JsonSchema {
static final String STRING_TYPE_NAME = "string";
static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
static final String ARRAY_TYPE_NAME = "array";
static final String OBJECT_TYPE_NAME = "object";
public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
ObjectNode result = JsonNodeFactory.instance.objectNode();

View File

@ -20,10 +20,6 @@ package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.junit.Test;
import java.io.IOException;
@ -88,21 +84,6 @@ public class JsonConverterTest {
assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson));
}
@Test
public void objectToCopycat() {
JsonNode objectJson = parse("{ \"schema\": { \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }" +
", \"payload\": { \"first\": 15, \"second\": \"foobar\" } }");
Schema schema = SchemaBuilder.record("record").fields()
.requiredInt("first")
.requiredString("second")
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("first", 15)
.set("second", "foobar")
.build();
assertEquals(record, converter.toCopycatData(objectJson));
}
@Test
public void booleanToJson() {
@ -171,25 +152,6 @@ public class JsonConverterTest {
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
@Test
public void objectToJson() {
Schema schema = SchemaBuilder.record("record").fields()
.requiredInt("first")
.requiredString("second")
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("first", 15)
.set("second", "foobar")
.build();
JsonNode converted = converter.fromCopycatData(record);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(parse("{ \"first\": 15, \"second\": \"foobar\" }"),
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
private JsonNode parse(String json) {
try {

View File

@ -21,10 +21,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@ -129,8 +125,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Test
public void testDeliverConvertsData() throws Exception {
// Validate conversion is performed when data is delivered
Schema schema = SchemaBuilder.record("sample").fields().endRecord();
GenericRecord record = new GenericRecordBuilder(schema).build();
Integer record = 12;
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(

View File

@ -23,10 +23,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@ -59,8 +55,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] STREAM_BYTES = "stream".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
private static final Schema RECORD_SCHEMA = SchemaBuilder.record("sample").fields().endRecord();
private static final GenericRecord RECORD = new GenericRecordBuilder(RECORD_SCHEMA).build();
private static final Integer RECORD = 12;
// The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place.
private static final String CONVERTED_RECORD = "converted-record";