mirror of https://github.com/apache/kafka.git
Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.
This commit is contained in:
parent
4674d136e1
commit
6ba87debad
|
@ -1,47 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
|
||||
public class BinaryData {
|
||||
/**
|
||||
* Compares regions of two byte arrays, returning a negative integer, zero, or positive integer when the first byte
|
||||
* array region is less than, equal to, or greater than the second byte array region, respectively.
|
||||
*
|
||||
* @param b1 first byte array
|
||||
* @param s1 start of region in first byte array
|
||||
* @param l1 length of region in first byte array
|
||||
* @param b2 second byte array
|
||||
* @param s2 start of region in second byte array
|
||||
* @param l2 length of region in second byte array
|
||||
* @return a negative integer, zero, or a positive integer as the first byte array is less than, equal to, or greater
|
||||
* than the second byte array
|
||||
*/
|
||||
public static int compareBytes(byte[] b1, int s1, int l1,
|
||||
byte[] b2, int s2, int l2) {
|
||||
int end1 = s1 + l1;
|
||||
int end2 = s2 + l2;
|
||||
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
|
||||
int a = b1[i] & 0xff;
|
||||
int b = b2[j] & 0xff;
|
||||
if (a != b)
|
||||
return a - b;
|
||||
}
|
||||
return l1 - l2;
|
||||
}
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/** Array that permits reuse of contained elements. */
|
||||
public interface GenericArray<T> extends List<T>, GenericContainer {
|
||||
/** The current content of the location where {@link #add(Object)} would next
|
||||
* store an element, if any. This permits reuse of arrays and their elements
|
||||
* without allocating new objects. */
|
||||
T peek();
|
||||
|
||||
/** Reverses the order of the elements in this array. */
|
||||
void reverse();
|
||||
}
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Contains data of other types. */
|
||||
public interface GenericContainer {
|
||||
/** The schema of this instance. */
|
||||
Schema getSchema();
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -1,27 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** An enum symbol. */
|
||||
public interface GenericEnumSymbol
|
||||
extends GenericContainer, Comparable<GenericEnumSymbol> {
|
||||
/** Return the symbol. */
|
||||
String toString();
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Fixed-size data. */
|
||||
public interface GenericFixed extends GenericContainer {
|
||||
/** Return the data. */
|
||||
byte[] bytes();
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** A generic instance of a record schema. Fields are accessible by name as
|
||||
* well as by index. */
|
||||
public interface GenericRecord extends IndexedRecord {
|
||||
/** Set the value of a field given its name. */
|
||||
void put(String key, Object v);
|
||||
|
||||
/** Return the value of a field given its name. */
|
||||
Object get(String key);
|
||||
}
|
|
@ -1,259 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.GenericData.Record;
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** A RecordBuilder for generic records. GenericRecordBuilder fills in default values
|
||||
* for fields if they are not specified. */
|
||||
public class GenericRecordBuilder extends RecordBuilderBase<Record> {
|
||||
private final GenericData.Record record;
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder for building Record instances.
|
||||
* @param schema the schema associated with the record class.
|
||||
*/
|
||||
public GenericRecordBuilder(Schema schema) {
|
||||
super(schema, GenericData.get());
|
||||
record = new GenericData.Record(schema);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder by copying an existing GenericRecordBuilder.
|
||||
* @param other the GenericRecordBuilder to copy.
|
||||
*/
|
||||
public GenericRecordBuilder(GenericRecordBuilder other) {
|
||||
super(other, GenericData.get());
|
||||
record = new GenericData.Record(other.record, /* deepCopy = */ true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a GenericRecordBuilder by copying an existing record instance.
|
||||
* @param other the record instance to copy.
|
||||
*/
|
||||
public GenericRecordBuilder(Record other) {
|
||||
super(other.getSchema(), GenericData.get());
|
||||
record = new GenericData.Record(other, /* deepCopy = */ true);
|
||||
|
||||
// Set all fields in the RecordBuilder that are set in the record
|
||||
for (Field f : schema().getFields()) {
|
||||
Object value = other.get(f.pos());
|
||||
// Only set the value if it is not null, if the schema type is null,
|
||||
// or if the schema type is a union that accepts nulls.
|
||||
if (isValidValue(f, value)) {
|
||||
set(f, data().deepCopy(f.schema(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param fieldName the name of the field to get.
|
||||
* @return the value of the field with the given name, or null if not set.
|
||||
*/
|
||||
public Object get(String fieldName) {
|
||||
return get(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param field the field to get.
|
||||
* @return the value of the given field, or null if not set.
|
||||
*/
|
||||
public Object get(Field field) {
|
||||
return get(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of a field.
|
||||
* @param pos the position of the field to get.
|
||||
* @return the value of the field with the given position, or null if not set.
|
||||
*/
|
||||
protected Object get(int pos) {
|
||||
return record.get(pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param fieldName the name of the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder set(String fieldName, Object value) {
|
||||
return set(schema().getField(fieldName), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param field the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder set(Field field, Object value) {
|
||||
return set(field, field.pos(), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param pos the field to set.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
protected GenericRecordBuilder set(int pos, Object value) {
|
||||
return set(fields()[pos], pos, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of a field.
|
||||
* @param field the field to set.
|
||||
* @param pos the position of the field.
|
||||
* @param value the value to set.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
private GenericRecordBuilder set(Field field, int pos, Object value) {
|
||||
validate(field, value);
|
||||
record.put(pos, value);
|
||||
fieldSetFlags()[pos] = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param fieldName the name of the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
public boolean has(String fieldName) {
|
||||
return has(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param field the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
public boolean has(Field field) {
|
||||
return has(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a field has been set.
|
||||
* @param pos the position of the field to check.
|
||||
* @return true if the given field is non-null; false otherwise.
|
||||
*/
|
||||
protected boolean has(int pos) {
|
||||
return fieldSetFlags()[pos];
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param fieldName the name of the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder clear(String fieldName) {
|
||||
return clear(schema().getField(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param field the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
public GenericRecordBuilder clear(Field field) {
|
||||
return clear(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the value of the given field.
|
||||
* @param pos the position of the field to clear.
|
||||
* @return a reference to the RecordBuilder.
|
||||
*/
|
||||
protected GenericRecordBuilder clear(int pos) {
|
||||
record.put(pos, null);
|
||||
fieldSetFlags()[pos] = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Record build() {
|
||||
Record record;
|
||||
try {
|
||||
record = new GenericData.Record(schema());
|
||||
} catch (Exception e) {
|
||||
throw new DataRuntimeException(e);
|
||||
}
|
||||
|
||||
for (Field field : fields()) {
|
||||
Object value;
|
||||
try {
|
||||
value = getWithDefault(field);
|
||||
} catch (IOException e) {
|
||||
throw new DataRuntimeException(e);
|
||||
}
|
||||
if (value != null) {
|
||||
record.put(field.pos(), value);
|
||||
}
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of the given field.
|
||||
* If the field has been set, the set value is returned (even if it's null).
|
||||
* If the field hasn't been set and has a default value, the default value
|
||||
* is returned.
|
||||
* @param field the field whose value should be retrieved.
|
||||
* @return the value set for the given field, the field's default value,
|
||||
* or null.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Object getWithDefault(Field field) throws IOException {
|
||||
return fieldSetFlags()[field.pos()] ?
|
||||
record.get(field.pos()) : defaultValue(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + ((record == null) ? 0 : record.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (!super.equals(obj))
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
GenericRecordBuilder other = (GenericRecordBuilder) obj;
|
||||
if (record == null) {
|
||||
if (other.record != null)
|
||||
return false;
|
||||
} else if (!record.equals(other.record))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** A record implementation that permits field access by integer index.*/
|
||||
public interface IndexedRecord extends GenericContainer {
|
||||
/** Set the value of a field given its position in the schema.
|
||||
* <p>This method is not meant to be called by user code, but only internally for deep copying */
|
||||
void put(int i, Object v);
|
||||
|
||||
/** Return the value of a field given its position in the schema.
|
||||
* <p>This method is not meant to be called by user code, but only internally for deep copying */
|
||||
Object get(int i);
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
|
||||
/** Interface for record builders */
|
||||
public interface RecordBuilder<T> {
|
||||
/**
|
||||
* Constructs a new instance using the values set in the RecordBuilder.
|
||||
* If a particular value was not set and the schema defines a default
|
||||
* value, the default value will be used.
|
||||
* @return a new instance using values set in the RecordBuilder.
|
||||
*/
|
||||
T build();
|
||||
}
|
|
@ -1,173 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
import org.apache.kafka.copycat.data.Schema.Type;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/** Abstract base class for RecordBuilder implementations. Not thread-safe. */
|
||||
public abstract class RecordBuilderBase<T extends IndexedRecord>
|
||||
implements RecordBuilder<T> {
|
||||
private static final Field[] EMPTY_FIELDS = new Field[0];
|
||||
private final Schema schema;
|
||||
private final Field[] fields;
|
||||
private final boolean[] fieldSetFlags;
|
||||
private final GenericData data;
|
||||
|
||||
protected final Schema schema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
protected final Field[] fields() {
|
||||
return fields;
|
||||
}
|
||||
|
||||
protected final boolean[] fieldSetFlags() {
|
||||
return fieldSetFlags;
|
||||
}
|
||||
|
||||
protected final GenericData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a RecordBuilderBase for building records of the given type.
|
||||
* @param schema the schema associated with the record class.
|
||||
*/
|
||||
protected RecordBuilderBase(Schema schema, GenericData data) {
|
||||
this.schema = schema;
|
||||
this.data = data;
|
||||
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
|
||||
fieldSetFlags = new boolean[fields.length];
|
||||
}
|
||||
|
||||
/**
|
||||
* RecordBuilderBase copy constructor.
|
||||
* Makes a deep copy of the values in the other builder.
|
||||
* @param other RecordBuilderBase instance to copy.
|
||||
*/
|
||||
protected RecordBuilderBase(RecordBuilderBase<T> other, GenericData data) {
|
||||
this.schema = other.schema;
|
||||
this.data = data;
|
||||
fields = (Field[]) schema.getFields().toArray(EMPTY_FIELDS);
|
||||
fieldSetFlags = new boolean[other.fieldSetFlags.length];
|
||||
System.arraycopy(
|
||||
other.fieldSetFlags, 0, fieldSetFlags, 0, fieldSetFlags.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a particular value for a given field is valid according to
|
||||
* the following algorithm:
|
||||
* 1. If the value is not null, or the field type is null, or the field type
|
||||
* is a union which accepts nulls, returns.
|
||||
* 2. Else, if the field has a default value, returns.
|
||||
* 3. Otherwise throws AvroRuntimeException.
|
||||
* @param field the field to validate.
|
||||
* @param value the value to validate.
|
||||
* @throws NullPointerException if value is null and the given field does
|
||||
* not accept null values.
|
||||
*/
|
||||
protected void validate(Field field, Object value) {
|
||||
if (isValidValue(field, value)) {
|
||||
return;
|
||||
} else if (field.defaultValue() != null) {
|
||||
return;
|
||||
} else {
|
||||
throw new DataRuntimeException(
|
||||
"Field " + field + " does not accept null values");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether a value is valid for a specified field.
|
||||
* @param f the field for which to test the value.
|
||||
* @param value the value to test.
|
||||
* @return true if the value is valid for the given field; false otherwise.
|
||||
*/
|
||||
protected static boolean isValidValue(Field f, Object value) {
|
||||
if (value != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Schema schema = f.schema();
|
||||
Type type = schema.getType();
|
||||
|
||||
// If the type is null, any value is valid
|
||||
if (type == Type.NULL) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the type is a union that allows nulls, any value is valid
|
||||
if (type == Type.UNION) {
|
||||
for (Schema s : schema.getTypes()) {
|
||||
if (s.getType() == Type.NULL) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The value is null but the type does not allow nulls
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the default value of the given field, if any.
|
||||
* @param field the field whose default value should be retrieved.
|
||||
* @return the default value associated with the given field,
|
||||
* or null if none is specified in the schema.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
protected Object defaultValue(Field field) throws IOException {
|
||||
return data.deepCopy(field.schema(), data.getDefaultValue(field));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + Arrays.hashCode(fieldSetFlags);
|
||||
result = prime * result + ((schema == null) ? 0 : schema.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
@SuppressWarnings("rawtypes")
|
||||
RecordBuilderBase other = (RecordBuilderBase) obj;
|
||||
if (!Arrays.equals(fieldSetFlags, other.fieldSetFlags))
|
||||
return false;
|
||||
if (schema == null) {
|
||||
if (other.schema != null)
|
||||
return false;
|
||||
} else if (!schema.equals(other.schema))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -60,12 +60,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
|
||||
/** The type of a schema. */
|
||||
public enum Type {
|
||||
RECORD {
|
||||
@Override
|
||||
public Object defaultValue(Schema schema) {
|
||||
return new GenericRecordBuilder(schema).build();
|
||||
}
|
||||
},
|
||||
ENUM {
|
||||
@Override
|
||||
public Object defaultValue(Schema schema) {
|
||||
|
@ -75,7 +69,7 @@ public abstract class Schema extends ObjectProperties {
|
|||
ARRAY {
|
||||
@Override
|
||||
public Object defaultValue(Schema schema) {
|
||||
return new GenericData.Array(0, schema);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
},
|
||||
MAP {
|
||||
|
@ -91,12 +85,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
return firstSchema.getType().defaultValue(firstSchema);
|
||||
}
|
||||
},
|
||||
FIXED {
|
||||
@Override
|
||||
public Object defaultValue(Schema schema) {
|
||||
return new GenericData.Fixed(schema);
|
||||
}
|
||||
},
|
||||
STRING {
|
||||
@Override
|
||||
public Object defaultValue(Schema schema) {
|
||||
|
@ -205,19 +193,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
hashCode = NO_HASHCODE;
|
||||
}
|
||||
|
||||
/** Create an anonymous record schema. */
|
||||
public static Schema createRecord(List<Field> fields) {
|
||||
Schema result = createRecord(null, null, null, false);
|
||||
result.setFields(fields);
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Create a named record schema. */
|
||||
public static Schema createRecord(String name, String doc, String namespace,
|
||||
boolean isError) {
|
||||
return new RecordSchema(new Name(name, namespace), doc, isError);
|
||||
}
|
||||
|
||||
/** Create an enum schema. */
|
||||
public static Schema createEnum(String name, String doc, String namespace,
|
||||
List<String> values) {
|
||||
|
@ -245,12 +220,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
return createUnion(new LockableArrayList<Schema>(types));
|
||||
}
|
||||
|
||||
/** Create a union schema. */
|
||||
public static Schema createFixed(String name, String doc, String space,
|
||||
int size) {
|
||||
return new FixedSchema(new Name(name, space), doc, size);
|
||||
}
|
||||
|
||||
/** Return the type of this schema. */
|
||||
public Type getType() {
|
||||
return type;
|
||||
|
@ -655,92 +624,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings(value = "unchecked")
|
||||
private static class RecordSchema extends NamedSchema {
|
||||
private List<Field> fields;
|
||||
private Map<String, Field> fieldMap;
|
||||
private final boolean isError;
|
||||
|
||||
public RecordSchema(Name name, String doc, boolean isError) {
|
||||
super(Type.RECORD, name, doc);
|
||||
this.isError = isError;
|
||||
}
|
||||
|
||||
public boolean isError() {
|
||||
return isError;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Field getField(String fieldname) {
|
||||
if (fieldMap == null)
|
||||
throw new DataRuntimeException("Schema fields not set yet");
|
||||
return fieldMap.get(fieldname);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Field> getFields() {
|
||||
if (fields == null)
|
||||
throw new DataRuntimeException("Schema fields not set yet");
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFields(List<Field> fields) {
|
||||
if (this.fields != null) {
|
||||
throw new DataRuntimeException("Fields are already set");
|
||||
}
|
||||
int i = 0;
|
||||
fieldMap = new HashMap<String, Field>();
|
||||
LockableArrayList ff = new LockableArrayList();
|
||||
for (Field f : fields) {
|
||||
if (f.position != -1)
|
||||
throw new DataRuntimeException("Field already used: " + f);
|
||||
f.position = i++;
|
||||
final Field existingField = fieldMap.put(f.name(), f);
|
||||
if (existingField != null) {
|
||||
throw new DataRuntimeException(String.format(
|
||||
"Duplicate field %s in record %s: %s and %s.",
|
||||
f.name(), name, f, existingField));
|
||||
}
|
||||
ff.add(f);
|
||||
}
|
||||
this.fields = ff.lock();
|
||||
this.hashCode = NO_HASHCODE;
|
||||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (o == this) return true;
|
||||
if (!(o instanceof RecordSchema)) return false;
|
||||
RecordSchema that = (RecordSchema) o;
|
||||
if (!equalCachedHash(that)) return false;
|
||||
if (!equalNames(that)) return false;
|
||||
if (!props.equals(that.props)) return false;
|
||||
Set seen = SEEN_EQUALS.get();
|
||||
SeenPair here = new SeenPair(this, o);
|
||||
if (seen.contains(here)) return true; // prevent stack overflow
|
||||
boolean first = seen.isEmpty();
|
||||
try {
|
||||
seen.add(here);
|
||||
return fields.equals(((RecordSchema) o).fields);
|
||||
} finally {
|
||||
if (first) seen.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
int computeHash() {
|
||||
Map seen = SEEN_HASHCODE.get();
|
||||
if (seen.containsKey(this)) return 0; // prevent stack overflow
|
||||
boolean first = seen.isEmpty();
|
||||
try {
|
||||
seen.put(this, this);
|
||||
return super.computeHash() + fields.hashCode();
|
||||
} finally {
|
||||
if (first) seen.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class EnumSchema extends NamedSchema {
|
||||
private final List<String> symbols;
|
||||
private final Map<String, Integer> ordinals;
|
||||
|
@ -884,36 +767,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FixedSchema extends NamedSchema {
|
||||
private final int size;
|
||||
|
||||
public FixedSchema(Name name, String doc, int size) {
|
||||
super(Type.FIXED, name, doc);
|
||||
if (size < 0)
|
||||
throw new IllegalArgumentException("Invalid fixed size: " + size);
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public int getFixedSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (o == this) return true;
|
||||
if (!(o instanceof FixedSchema)) return false;
|
||||
FixedSchema that = (FixedSchema) o;
|
||||
return equalCachedHash(that)
|
||||
&& equalNames(that)
|
||||
&& size == that.size
|
||||
&& props.equals(that.props);
|
||||
}
|
||||
|
||||
@Override
|
||||
int computeHash() {
|
||||
return super.computeHash() + size;
|
||||
}
|
||||
}
|
||||
|
||||
private static class StringSchema extends Schema {
|
||||
public StringSchema() {
|
||||
super(Type.STRING);
|
||||
|
@ -1075,8 +928,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
case ENUM:
|
||||
return (defaultValue instanceof String);
|
||||
case BYTES:
|
||||
case FIXED:
|
||||
return (defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer);
|
||||
case INT:
|
||||
return (defaultValue instanceof Integer);
|
||||
case LONG:
|
||||
|
@ -1105,13 +956,6 @@ public abstract class Schema extends ObjectProperties {
|
|||
return true;
|
||||
case UNION: // union default: first branch
|
||||
return isValidDefault(schema.getTypes().get(0), defaultValue);
|
||||
case RECORD:
|
||||
if (!(defaultValue instanceof GenericData.Record))
|
||||
return false;
|
||||
for (Field field : schema.getFields())
|
||||
if (!isValidDefault(field.schema(), ((GenericData.Record) defaultValue).get(field.name())))
|
||||
return false;
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -224,18 +224,6 @@ public class SchemaBuilder {
|
|||
new NameContext().namespace(namespace));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a builder for an Avro record with the specified name.
|
||||
* This is equivalent to:
|
||||
* <pre>
|
||||
* builder().record(name);
|
||||
* </pre>
|
||||
* @param name the record name
|
||||
*/
|
||||
public static RecordBuilder<Schema> record(String name) {
|
||||
return builder().record(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a builder for an Avro enum with the specified name and symbols (values).
|
||||
* This is equivalent to:
|
||||
|
@ -248,18 +236,6 @@ public class SchemaBuilder {
|
|||
return builder().enumeration(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a builder for an Avro fixed type with the specified name and size.
|
||||
* This is equivalent to:
|
||||
* <pre>
|
||||
* builder().fixed(name);
|
||||
* </pre>
|
||||
* @param name the fixed name
|
||||
*/
|
||||
public static FixedBuilder<Schema> fixed(String name) {
|
||||
return builder().fixed(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a builder for an Avro array
|
||||
* This is equivalent to:
|
||||
|
@ -691,41 +667,6 @@ public class SchemaBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds an Avro Fixed type with optional properties, namespace, doc, and
|
||||
* aliases.
|
||||
* <p/>
|
||||
* Set properties with {@link #prop(String, Object)}, namespace with
|
||||
* {@link #namespace(String)}, doc with {@link #doc(String)}, and aliases with
|
||||
* {@link #aliases(String[])}.
|
||||
* <p/>
|
||||
* The Fixed schema is finalized when its required size is set via
|
||||
* {@link #size(int)}.
|
||||
**/
|
||||
public static final class FixedBuilder<R> extends
|
||||
NamespacedBuilder<R, FixedBuilder<R>> {
|
||||
private FixedBuilder(Completion<R> context, NameContext names, String name) {
|
||||
super(context, names, name);
|
||||
}
|
||||
|
||||
private static <R> FixedBuilder<R> create(Completion<R> context,
|
||||
NameContext names, String name) {
|
||||
return new FixedBuilder<R>(context, names, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FixedBuilder<R> self() {
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Configure this fixed type's size, and end its configuration. **/
|
||||
public R size(int size) {
|
||||
Schema schema = Schema.createFixed(name(), super.doc(), space(), size);
|
||||
completeSchema(schema);
|
||||
return context().complete(schema);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds an Avro Enum type with optional properties, namespace, doc, and
|
||||
* aliases.
|
||||
|
@ -1162,19 +1103,6 @@ public class SchemaBuilder {
|
|||
return ArrayBuilder.create(context, names);
|
||||
}
|
||||
|
||||
/** Build an Avro fixed type. Example usage:
|
||||
* <pre>
|
||||
* fixed("com.foo.IPv4").size(4)
|
||||
* </pre>
|
||||
* Equivalent to Avro JSON Schema:
|
||||
* <pre>
|
||||
* {"type":"fixed", "name":"com.foo.IPv4", "size":4}
|
||||
* </pre>
|
||||
**/
|
||||
public final FixedBuilder<R> fixed(String name) {
|
||||
return FixedBuilder.create(context, names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro enum type. Example usage:
|
||||
* <pre>
|
||||
* enumeration("Suits").namespace("org.cards").doc("card suit names")
|
||||
|
@ -1191,29 +1119,6 @@ public class SchemaBuilder {
|
|||
return EnumBuilder.create(context, names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro record type. Example usage:
|
||||
* <pre>
|
||||
* record("com.foo.Foo").fields()
|
||||
* .name("field1").typeInt().intDefault(1)
|
||||
* .name("field2").typeString().noDefault()
|
||||
* .name("field3").optional().typeFixed("FooFixed").size(4)
|
||||
* .endRecord()
|
||||
* </pre>
|
||||
* Equivalent to Avro JSON Schema:
|
||||
* <pre>
|
||||
* {"type":"record", "name":"com.foo.Foo", "fields": [
|
||||
* {"name":"field1", "type":"int", "default":1},
|
||||
* {"name":"field2", "type":"string"},
|
||||
* {"name":"field3", "type":[
|
||||
* null, {"type":"fixed", "name":"FooFixed", "size":4}
|
||||
* ]}
|
||||
* ]}
|
||||
* </pre>
|
||||
**/
|
||||
public final RecordBuilder<R> record(String name) {
|
||||
return RecordBuilder.create(context, names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro union schema type. Example usage:
|
||||
* <pre>unionOf().stringType().and().bytesType().endUnion()</pre>
|
||||
**/
|
||||
|
@ -1450,21 +1355,11 @@ public class SchemaBuilder {
|
|||
return ArrayBuilder.create(wrap(new ArrayDefault<R>(bldr)), names);
|
||||
}
|
||||
|
||||
/** Build an Avro fixed type. **/
|
||||
public final FixedBuilder<FixedDefault<R>> fixed(String name) {
|
||||
return FixedBuilder.create(wrap(new FixedDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro enum type. **/
|
||||
public final EnumBuilder<EnumDefault<R>> enumeration(String name) {
|
||||
return EnumBuilder.create(wrap(new EnumDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro record type. **/
|
||||
public final RecordBuilder<RecordDefault<R>> record(String name) {
|
||||
return RecordBuilder.create(wrap(new RecordDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
private <C> Completion<C> wrap(
|
||||
Completion<C> completion) {
|
||||
if (wrapper != null) {
|
||||
|
@ -1677,51 +1572,16 @@ public class SchemaBuilder {
|
|||
return ArrayBuilder.create(completion(new ArrayDefault<R>(bldr)), names);
|
||||
}
|
||||
|
||||
/** Build an Avro fixed type. **/
|
||||
public FixedBuilder<UnionAccumulator<FixedDefault<R>>> fixed(String name) {
|
||||
return FixedBuilder.create(completion(new FixedDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro enum type. **/
|
||||
public EnumBuilder<UnionAccumulator<EnumDefault<R>>> enumeration(String name) {
|
||||
return EnumBuilder.create(completion(new EnumDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
/** Build an Avro record type. **/
|
||||
public RecordBuilder<UnionAccumulator<RecordDefault<R>>> record(String name) {
|
||||
return RecordBuilder.create(completion(new RecordDefault<R>(bldr)), names, name);
|
||||
}
|
||||
|
||||
private <C> UnionCompletion<C> completion(Completion<C> context) {
|
||||
return new UnionCompletion<C>(context, names, new ArrayList<Schema>());
|
||||
}
|
||||
}
|
||||
|
||||
public final static class RecordBuilder<R> extends
|
||||
NamespacedBuilder<R, RecordBuilder<R>> {
|
||||
private RecordBuilder(Completion<R> context, NameContext names, String name) {
|
||||
super(context, names, name);
|
||||
}
|
||||
|
||||
private static <R> RecordBuilder<R> create(Completion<R> context,
|
||||
NameContext names, String name) {
|
||||
return new RecordBuilder<R>(context, names, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecordBuilder<R> self() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public FieldAssembler<R> fields() {
|
||||
Schema record = Schema.createRecord(name(), doc(), space(), false);
|
||||
// place the record in the name context, fields yet to be set.
|
||||
completeSchema(record);
|
||||
return new FieldAssembler<R>(
|
||||
context(), names().namespace(record.getNamespace()), record);
|
||||
}
|
||||
}
|
||||
|
||||
public final static class FieldAssembler<R> {
|
||||
private final List<Field> fields = new ArrayList<Field>();
|
||||
private final Completion<R> context;
|
||||
|
@ -2374,11 +2234,6 @@ public class SchemaBuilder {
|
|||
super(field);
|
||||
}
|
||||
|
||||
/** Completes this field with the default value provided, cannot be null **/
|
||||
public final FieldAssembler<R> recordDefault(GenericRecord defaultVal) {
|
||||
return super.usingDefault(defaultVal);
|
||||
}
|
||||
|
||||
@Override
|
||||
final RecordDefault<R> self() {
|
||||
return this;
|
||||
|
|
|
@ -1,40 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
/** Thrown when the expected contents of a union cannot be resolved. */
|
||||
public class UnresolvedUnionException extends DataRuntimeException {
|
||||
private Object unresolvedDatum;
|
||||
private Schema unionSchema;
|
||||
|
||||
public UnresolvedUnionException(Schema unionSchema, Object unresolvedDatum) {
|
||||
super("Not in union " + unionSchema + ": " + unresolvedDatum);
|
||||
this.unionSchema = unionSchema;
|
||||
this.unresolvedDatum = unresolvedDatum;
|
||||
}
|
||||
|
||||
public Object getUnresolvedDatum() {
|
||||
return unresolvedDatum;
|
||||
}
|
||||
|
||||
public Schema getUnionSchema() {
|
||||
return unionSchema;
|
||||
}
|
||||
}
|
|
@ -1,158 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
|
||||
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/** A Utf8 string. Unlike {@link String}, instances are mutable. This is more
|
||||
* efficient than {@link String} when reading or writing a sequence of values,
|
||||
* as a single instance may be reused. */
|
||||
public class Utf8 implements Comparable<Utf8>, CharSequence {
|
||||
private static final byte[] EMPTY = new byte[0];
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
|
||||
private byte[] bytes = EMPTY;
|
||||
private int length;
|
||||
private String string;
|
||||
|
||||
public Utf8() {
|
||||
}
|
||||
|
||||
public Utf8(String string) {
|
||||
this.bytes = getBytesFor(string);
|
||||
this.length = bytes.length;
|
||||
this.string = string;
|
||||
}
|
||||
|
||||
public Utf8(Utf8 other) {
|
||||
this.length = other.length;
|
||||
this.bytes = new byte[other.length];
|
||||
System.arraycopy(other.bytes, 0, this.bytes, 0, this.length);
|
||||
this.string = other.string;
|
||||
}
|
||||
|
||||
public Utf8(byte[] bytes) {
|
||||
this.bytes = bytes;
|
||||
this.length = bytes.length;
|
||||
}
|
||||
|
||||
/** Return UTF-8 encoded bytes.
|
||||
* Only valid through {@link #getByteLength()}. */
|
||||
public byte[] getBytes() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/** Return length in bytes.
|
||||
* @deprecated call {@link #getByteLength()} instead. */
|
||||
public int getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
/** Return length in bytes. */
|
||||
public int getByteLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
/** Set length in bytes. Should called whenever byte content changes, even
|
||||
* if the length does not change, as this also clears the cached String.
|
||||
* @deprecated call {@link #setByteLength(int)} instead. */
|
||||
public Utf8 setLength(int newLength) {
|
||||
return setByteLength(newLength);
|
||||
}
|
||||
|
||||
/** Set length in bytes. Should called whenever byte content changes, even
|
||||
* if the length does not change, as this also clears the cached String. */
|
||||
public Utf8 setByteLength(int newLength) {
|
||||
if (this.bytes.length < newLength) {
|
||||
byte[] newBytes = new byte[newLength];
|
||||
System.arraycopy(bytes, 0, newBytes, 0, this.length);
|
||||
this.bytes = newBytes;
|
||||
}
|
||||
this.length = newLength;
|
||||
this.string = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Set to the contents of a String. */
|
||||
public Utf8 set(String string) {
|
||||
this.bytes = getBytesFor(string);
|
||||
this.length = bytes.length;
|
||||
this.string = string;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (this.length == 0) return "";
|
||||
if (this.string == null) {
|
||||
this.string = new String(bytes, 0, length, UTF8);
|
||||
}
|
||||
return this.string;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == this) return true;
|
||||
if (!(o instanceof Utf8)) return false;
|
||||
Utf8 that = (Utf8) o;
|
||||
if (!(this.length == that.length)) return false;
|
||||
byte[] thatBytes = that.bytes;
|
||||
for (int i = 0; i < this.length; i++)
|
||||
if (bytes[i] != thatBytes[i])
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int hash = 0;
|
||||
for (int i = 0; i < this.length; i++)
|
||||
hash = hash * 31 + bytes[i];
|
||||
return hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Utf8 that) {
|
||||
return BinaryData.compareBytes(this.bytes, 0, this.length,
|
||||
that.bytes, 0, that.length);
|
||||
}
|
||||
|
||||
// CharSequence implementation
|
||||
@Override
|
||||
public char charAt(int index) {
|
||||
return toString().charAt(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int length() {
|
||||
return toString().length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CharSequence subSequence(int start, int end) {
|
||||
return toString().subSequence(start, end);
|
||||
}
|
||||
|
||||
/** Gets the UTF-8 bytes for a String */
|
||||
public static final byte[] getBytesFor(String str) {
|
||||
return str.getBytes(UTF8);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,410 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
package org.apache.kafka.copycat.data;
|
||||
|
||||
import org.apache.kafka.copycat.data.GenericData.Record;
|
||||
import org.apache.kafka.copycat.data.Schema.Field;
|
||||
import org.apache.kafka.copycat.data.Schema.Type;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestGenericData {
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testrecordConstructorNullSchema() throws Exception {
|
||||
new GenericData.Record(null);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testrecordConstructorWrongSchema() throws Exception {
|
||||
new GenericData.Record(Schema.create(Schema.Type.INT));
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testArrayConstructorNullSchema() throws Exception {
|
||||
new GenericData.Array<Object>(1, null);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testArrayConstructorWrongSchema() throws Exception {
|
||||
new GenericData.Array<Object>(1, Schema.create(Schema.Type.INT));
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testRecordCreateEmptySchema() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
Record r = new GenericData.Record(s);
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testGetEmptySchemaFields() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
s.getFields();
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testGetEmptySchemaField() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
s.getField("foo");
|
||||
}
|
||||
|
||||
@Test(expected = DataRuntimeException.class)
|
||||
public void testRecordPutInvalidField() throws Exception {
|
||||
Schema s = Schema.createRecord("schemaName", "schemaDoc", "namespace", false);
|
||||
List<Schema.Field> fields = new ArrayList<Schema.Field>();
|
||||
fields.add(new Schema.Field("someFieldName", s, "docs", null));
|
||||
s.setFields(fields);
|
||||
Record r = new GenericData.Record(s);
|
||||
r.put("invalidFieldName", "someValue");
|
||||
}
|
||||
|
||||
@Test
|
||||
/** Make sure that even with nulls, hashCode() doesn't throw NPE. */
|
||||
public void testHashCode() {
|
||||
GenericData.get().hashCode(null, Schema.create(Type.NULL));
|
||||
GenericData.get().hashCode(null, Schema.createUnion(
|
||||
Arrays.asList(Schema.create(Type.BOOLEAN), Schema.create(Type.STRING))));
|
||||
List<CharSequence> stuff = new ArrayList<CharSequence>();
|
||||
stuff.add("string");
|
||||
Schema schema = recordSchema();
|
||||
GenericRecord r = new GenericData.Record(schema);
|
||||
r.put(0, stuff);
|
||||
GenericData.get().hashCode(r, schema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals() {
|
||||
Schema s = recordSchema();
|
||||
GenericRecord r0 = new GenericData.Record(s);
|
||||
GenericRecord r1 = new GenericData.Record(s);
|
||||
GenericRecord r2 = new GenericData.Record(s);
|
||||
Collection<CharSequence> l0 = new ArrayDeque<CharSequence>();
|
||||
List<CharSequence> l1 = new ArrayList<CharSequence>();
|
||||
GenericArray<CharSequence> l2 =
|
||||
new GenericData.Array<CharSequence>(1, s.getFields().get(0).schema());
|
||||
String foo = "foo";
|
||||
l0.add(new StringBuffer(foo));
|
||||
l1.add(foo);
|
||||
l2.add(new Utf8(foo));
|
||||
r0.put(0, l0);
|
||||
r1.put(0, l1);
|
||||
r2.put(0, l2);
|
||||
assertEquals(r0, r1);
|
||||
assertEquals(r0, r2);
|
||||
assertEquals(r1, r2);
|
||||
}
|
||||
|
||||
private Schema recordSchema() {
|
||||
List<Field> fields = new ArrayList<Field>();
|
||||
fields.add(new Field("anArray", Schema.createArray(Schema.create(Type.STRING)), null, null));
|
||||
Schema schema = Schema.createRecord("arrayFoo", "test", "mytest", false);
|
||||
schema.setFields(fields);
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals2() {
|
||||
Schema schema1 = Schema.createRecord("r", null, "x", false);
|
||||
List<Field> fields1 = new ArrayList<Field>();
|
||||
fields1.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
|
||||
Field.Order.IGNORE));
|
||||
schema1.setFields(fields1);
|
||||
|
||||
// only differs in field order
|
||||
Schema schema2 = Schema.createRecord("r", null, "x", false);
|
||||
List<Field> fields2 = new ArrayList<Field>();
|
||||
fields2.add(new Field("a", Schema.create(Schema.Type.STRING), null, null,
|
||||
Field.Order.ASCENDING));
|
||||
schema2.setFields(fields2);
|
||||
|
||||
GenericRecord record1 = new GenericData.Record(schema1);
|
||||
record1.put("a", "1");
|
||||
|
||||
GenericRecord record2 = new GenericData.Record(schema2);
|
||||
record2.put("a", "2");
|
||||
|
||||
assertFalse(record2.equals(record1));
|
||||
assertFalse(record1.equals(record2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordGetFieldDoesntExist() throws Exception {
|
||||
List<Field> fields = new ArrayList<Field>();
|
||||
Schema schema = Schema.createRecord(fields);
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
assertNull(record.get("does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayReversal() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> forward = new GenericData.Array<Integer>(10, schema);
|
||||
GenericArray<Integer> backward = new GenericData.Array<Integer>(10, schema);
|
||||
for (int i = 0; i <= 9; i++) {
|
||||
forward.add(i);
|
||||
}
|
||||
for (int i = 9; i >= 0; i--) {
|
||||
backward.add(i);
|
||||
}
|
||||
forward.reverse();
|
||||
assertTrue(forward.equals(backward));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayListInterface() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(1, schema);
|
||||
array.add(99);
|
||||
assertEquals(new Integer(99), array.get(0));
|
||||
List<Integer> list = new ArrayList<Integer>();
|
||||
list.add(99);
|
||||
assertEquals(array, list);
|
||||
assertEquals(list, array);
|
||||
assertEquals(list.hashCode(), array.hashCode());
|
||||
try {
|
||||
array.get(2);
|
||||
fail("Expected IndexOutOfBoundsException getting index 2");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
array.clear();
|
||||
assertEquals(0, array.size());
|
||||
try {
|
||||
array.get(0);
|
||||
fail("Expected IndexOutOfBoundsException getting index 0 after clear()");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayAddAtLocation() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(6, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 5; ++i)
|
||||
array.add(i);
|
||||
assertEquals(5, array.size());
|
||||
array.add(0, 6);
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(6, array.size());
|
||||
assertEquals(new Integer(0), array.get(1));
|
||||
assertEquals(new Integer(4), array.get(5));
|
||||
array.add(6, 7);
|
||||
assertEquals(new Integer(7), array.get(6));
|
||||
assertEquals(7, array.size());
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(new Integer(4), array.get(5));
|
||||
array.add(1, 8);
|
||||
assertEquals(new Integer(8), array.get(1));
|
||||
assertEquals(new Integer(0), array.get(2));
|
||||
assertEquals(new Integer(6), array.get(0));
|
||||
assertEquals(8, array.size());
|
||||
try {
|
||||
array.get(9);
|
||||
fail("Expected IndexOutOfBoundsException after adding elements");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayRemove() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 10; ++i)
|
||||
array.add(i);
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(0), array.get(0));
|
||||
assertEquals(new Integer(9), array.get(9));
|
||||
|
||||
array.remove(0);
|
||||
assertEquals(9, array.size());
|
||||
assertEquals(new Integer(1), array.get(0));
|
||||
assertEquals(new Integer(2), array.get(1));
|
||||
assertEquals(new Integer(9), array.get(8));
|
||||
|
||||
// Test boundary errors.
|
||||
try {
|
||||
array.get(9);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
try {
|
||||
array.set(9, 99);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
try {
|
||||
array.remove(9);
|
||||
fail("Expected IndexOutOfBoundsException after removing an element");
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
}
|
||||
|
||||
// Test that we can still remove for properly sized arrays, and the rval
|
||||
assertEquals(new Integer(9), array.remove(8));
|
||||
assertEquals(8, array.size());
|
||||
|
||||
|
||||
// Test insertion after remove
|
||||
array.add(88);
|
||||
assertEquals(new Integer(88), array.get(8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArraySet() {
|
||||
Schema schema = Schema.createArray(Schema.create(Schema.Type.INT));
|
||||
GenericArray<Integer> array = new GenericData.Array<Integer>(10, schema);
|
||||
array.clear();
|
||||
for (int i = 0; i < 10; ++i)
|
||||
array.add(i);
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(0), array.get(0));
|
||||
assertEquals(new Integer(5), array.get(5));
|
||||
|
||||
assertEquals(new Integer(5), array.set(5, 55));
|
||||
assertEquals(10, array.size());
|
||||
assertEquals(new Integer(55), array.get(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToStringDoesNotEscapeForwardSlash() throws Exception {
|
||||
GenericData data = GenericData.get();
|
||||
assertEquals("\"/\"", data.toString("/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToStringNanInfinity() throws Exception {
|
||||
GenericData data = GenericData.get();
|
||||
assertEquals("\"Infinity\"", data.toString(Float.POSITIVE_INFINITY));
|
||||
assertEquals("\"-Infinity\"", data.toString(Float.NEGATIVE_INFINITY));
|
||||
assertEquals("\"NaN\"", data.toString(Float.NaN));
|
||||
assertEquals("\"Infinity\"", data.toString(Double.POSITIVE_INFINITY));
|
||||
assertEquals("\"-Infinity\"", data.toString(Double.NEGATIVE_INFINITY));
|
||||
assertEquals("\"NaN\"", data.toString(Double.NaN));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnumCompare() {
|
||||
Schema s = Schema.createEnum("Kind", null, null, Arrays.asList("Z", "Y", "X"));
|
||||
GenericEnumSymbol z = new GenericData.EnumSymbol(s, "Z");
|
||||
GenericEnumSymbol y = new GenericData.EnumSymbol(s, "Y");
|
||||
assertEquals(0, z.compareTo(z));
|
||||
assertTrue(y.compareTo(z) > 0);
|
||||
assertTrue(z.compareTo(y) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteBufferDeepCopy() {
|
||||
// Test that a deep copy of a byte buffer respects the byte buffer
|
||||
// limits and capacity.
|
||||
byte[] buffer_value = {0, 1, 2, 3, 0, 0, 0};
|
||||
ByteBuffer buffer = ByteBuffer.wrap(buffer_value, 1, 4);
|
||||
Schema schema = Schema.createRecord("my_record", "doc", "mytest", false);
|
||||
Field byte_field =
|
||||
new Field("bytes", Schema.create(Type.BYTES), null, null);
|
||||
schema.setFields(Arrays.asList(byte_field));
|
||||
|
||||
GenericRecord record = new GenericData.Record(schema);
|
||||
record.put(byte_field.name(), buffer);
|
||||
|
||||
GenericRecord copy = GenericData.get().deepCopy(schema, record);
|
||||
ByteBuffer buffer_copy = (ByteBuffer) copy.get(byte_field.name());
|
||||
|
||||
assertEquals(buffer, buffer_copy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateNullableEnum() {
|
||||
List<Schema> unionTypes = new ArrayList<Schema>();
|
||||
Schema schema;
|
||||
Schema nullSchema = Schema.create(Type.NULL);
|
||||
Schema enumSchema = Schema.createEnum("AnEnum", null, null, Arrays.asList("X", "Y", "Z"));
|
||||
GenericEnumSymbol w = new GenericData.EnumSymbol(enumSchema, "W");
|
||||
GenericEnumSymbol x = new GenericData.EnumSymbol(enumSchema, "X");
|
||||
GenericEnumSymbol y = new GenericData.EnumSymbol(enumSchema, "Y");
|
||||
GenericEnumSymbol z = new GenericData.EnumSymbol(enumSchema, "Z");
|
||||
|
||||
// null is first
|
||||
unionTypes.clear();
|
||||
unionTypes.add(nullSchema);
|
||||
unionTypes.add(enumSchema);
|
||||
schema = Schema.createUnion(unionTypes);
|
||||
|
||||
assertTrue(GenericData.get().validate(schema, z));
|
||||
assertTrue(GenericData.get().validate(schema, y));
|
||||
assertTrue(GenericData.get().validate(schema, x));
|
||||
assertFalse(GenericData.get().validate(schema, w));
|
||||
assertTrue(GenericData.get().validate(schema, null));
|
||||
|
||||
// null is last
|
||||
unionTypes.clear();
|
||||
unionTypes.add(enumSchema);
|
||||
unionTypes.add(nullSchema);
|
||||
schema = Schema.createUnion(unionTypes);
|
||||
|
||||
assertTrue(GenericData.get().validate(schema, z));
|
||||
assertTrue(GenericData.get().validate(schema, y));
|
||||
assertTrue(GenericData.get().validate(schema, x));
|
||||
assertFalse(GenericData.get().validate(schema, w));
|
||||
assertTrue(GenericData.get().validate(schema, null));
|
||||
}
|
||||
|
||||
private enum anEnum {ONE, TWO, THREE}
|
||||
|
||||
;
|
||||
|
||||
@Test
|
||||
public void validateRequiresGenericSymbolForEnumSchema() {
|
||||
final Schema schema = Schema.createEnum("my_enum", "doc", "namespace", Arrays.asList("ONE", "TWO", "THREE"));
|
||||
final GenericData gd = GenericData.get();
|
||||
|
||||
/* positive cases */
|
||||
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, "ONE")));
|
||||
assertTrue(gd.validate(schema, new GenericData.EnumSymbol(schema, anEnum.ONE)));
|
||||
|
||||
/* negative cases */
|
||||
assertFalse("We don't expect GenericData to allow a String datum for an enum schema", gd.validate(schema, "ONE"));
|
||||
assertFalse("We don't expect GenericData to allow a Java Enum for an enum schema", gd.validate(schema, anEnum.ONE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateUnion() {
|
||||
Schema type1Schema = SchemaBuilder.record("Type1")
|
||||
.fields()
|
||||
.requiredString("myString")
|
||||
.requiredInt("myInt")
|
||||
.endRecord();
|
||||
|
||||
Schema type2Schema = SchemaBuilder.record("Type2")
|
||||
.fields()
|
||||
.requiredString("myString")
|
||||
.endRecord();
|
||||
|
||||
Schema unionSchema = SchemaBuilder.unionOf()
|
||||
.type(type1Schema).and().type(type2Schema)
|
||||
.endUnion();
|
||||
|
||||
GenericRecord record = new GenericData.Record(type2Schema);
|
||||
record.put("myString", "myValue");
|
||||
assertTrue(GenericData.get().validate(unionSchema, record));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE module PUBLIC
|
||||
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
|
||||
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
|
||||
<!--
|
||||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
-->
|
||||
<module name="Checker">
|
||||
<property name="localeLanguage" value="en"/>
|
||||
|
||||
<module name="FileTabCharacter"/>
|
||||
|
||||
<!-- header -->
|
||||
<module name="RegexpHeader">
|
||||
<property name="header" value="/\*\*\nLicensed to the Apache.*"/>
|
||||
</module>
|
||||
|
||||
<module name="TreeWalker">
|
||||
|
||||
<!-- code cleanup -->
|
||||
<module name="UnusedImports"/>
|
||||
<module name="RedundantImport"/>
|
||||
<module name="IllegalImport" />
|
||||
<module name="EqualsHashCode"/>
|
||||
<module name="SimplifyBooleanExpression"/>
|
||||
<module name="OneStatementPerLine"/>
|
||||
<module name="UnnecessaryParentheses" />
|
||||
<module name="SimplifyBooleanReturn"/>
|
||||
|
||||
<!-- style -->
|
||||
<module name="DefaultComesLast"/>
|
||||
<module name="EmptyStatement"/>
|
||||
<module name="ArrayTypeStyle"/>
|
||||
<module name="UpperEll"/>
|
||||
<module name="LeftCurly"/>
|
||||
<module name="RightCurly"/>
|
||||
<module name="EmptyStatement"/>
|
||||
<module name="ConstantName">
|
||||
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
|
||||
</module>
|
||||
<module name="LocalVariableName"/>
|
||||
<module name="LocalFinalVariableName"/>
|
||||
<module name="ClassTypeParameterName"/>
|
||||
<module name="MemberName"/>
|
||||
<module name="MethodTypeParameterName"/>
|
||||
<module name="PackageName"/>
|
||||
<module name="ParameterName"/>
|
||||
<module name="StaticVariableName"/>
|
||||
<module name="TypeName"/>
|
||||
|
||||
<!-- dependencies -->
|
||||
<module name="ImportControl">
|
||||
<property name="file" value="${basedir}/checkstyle/import-control.xml"/>
|
||||
</module>
|
||||
|
||||
<!-- whitespace -->
|
||||
<module name="GenericWhitespace"/>
|
||||
<module name="NoWhitespaceBefore"/>
|
||||
<module name="WhitespaceAfter" />
|
||||
<module name="NoWhitespaceAfter"/>
|
||||
<module name="WhitespaceAround">
|
||||
<property name="allowEmptyConstructors" value="true"/>
|
||||
<property name="allowEmptyMethods" value="true"/>
|
||||
</module>
|
||||
<module name="Indentation"/>
|
||||
<module name="MethodParamPad"/>
|
||||
<module name="ParenPad"/>
|
||||
<module name="TypecastParenPad"/>
|
||||
</module>
|
||||
</module>
|
|
@ -0,0 +1,168 @@
|
|||
<!DOCTYPE import-control PUBLIC
|
||||
"-//Puppy Crawl//DTD Import Control 1.1//EN"
|
||||
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
|
||||
<!--
|
||||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
-->
|
||||
<import-control pkg="org.apache.kafka">
|
||||
|
||||
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
|
||||
|
||||
<!-- common library dependencies -->
|
||||
<allow pkg="java" />
|
||||
<allow pkg="javax.management" />
|
||||
<allow pkg="org.slf4j" />
|
||||
<allow pkg="org.junit" />
|
||||
|
||||
<!-- no one depends on the server -->
|
||||
<disallow pkg="kafka" />
|
||||
|
||||
<!-- anyone can use public classes -->
|
||||
<allow pkg="org.apache.kafka.common" exact-match="true" />
|
||||
<allow pkg="org.apache.kafka.common.utils" />
|
||||
|
||||
<subpackage name="common">
|
||||
<disallow pkg="org.apache.kafka.clients" />
|
||||
<allow pkg="org.apache.kafka.common" exact-match="true" />
|
||||
<allow pkg="org.apache.kafka.test" />
|
||||
|
||||
<subpackage name="config">
|
||||
<allow pkg="org.apache.kafka.common.config" />
|
||||
<!-- for testing -->
|
||||
<allow pkg="org.apache.kafka.common.metrics" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="metrics">
|
||||
<allow pkg="org.apache.kafka.common.metrics" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="network">
|
||||
<allow pkg="org.apache.kafka.common.metrics" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="protocol">
|
||||
<allow pkg="org.apache.kafka.common.errors" />
|
||||
<allow pkg="org.apache.kafka.common.protocol.types" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="record">
|
||||
<allow pkg="net.jpountz" />
|
||||
<allow pkg="org.apache.kafka.common.record" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="requests">
|
||||
<allow pkg="org.apache.kafka.common.protocol" />
|
||||
<allow pkg="org.apache.kafka.common.network" />
|
||||
<!-- for testing -->
|
||||
<allow pkg="org.apache.kafka.common.errors" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="serialization">
|
||||
<allow class="org.apache.kafka.common.errors.SerializationException" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="clients">
|
||||
<allow pkg="org.apache.kafka.common" />
|
||||
<allow pkg="org.slf4j" />
|
||||
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
|
||||
<allow pkg="org.apache.kafka.test" />
|
||||
|
||||
<subpackage name="consumer">
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="producer">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="net.sourceforge.argparse4j" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="log4jappender">
|
||||
<allow pkg="org.apache.log4j" />
|
||||
<allow pkg="org.apache.kafka.clients" />
|
||||
<allow pkg="org.apache.kafka.common" />
|
||||
<allow pkg="org.apache.kafka.test" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="test">
|
||||
<allow pkg="org.apache.kafka" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="copycat">
|
||||
<allow pkg="org.apache.kafka.copycat.data" />
|
||||
<allow pkg="org.apache.kafka.copycat.errors" />
|
||||
|
||||
<subpackage name="source">
|
||||
<allow pkg="org.apache.kafka.copycat.connector" />
|
||||
<allow pkg="org.apache.kafka.copycat.storage" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="sink">
|
||||
<allow pkg="org.apache.kafka.copycat.connector" />
|
||||
<allow pkg="org.apache.kafka.copycat.storage" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="runtime">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
<allow pkg="org.apache.kafka.common" />
|
||||
<allow pkg="org.apache.kafka.clients" />
|
||||
<!-- for tests -->
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="org.powermock" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="cli">
|
||||
<allow pkg="org.apache.kafka.copycat.runtime" />
|
||||
<allow pkg="org.apache.kafka.copycat.util" />
|
||||
<allow pkg="org.apache.kafka.common" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="storage">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
<allow pkg="org.apache.kafka.common.serialization" />
|
||||
<!-- for tests -->
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="org.powermock" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="util">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="json">
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="org.apache.kafka.common.serialization" />
|
||||
<allow pkg="org.apache.kafka.common.errors" />
|
||||
<allow pkg="org.apache.kafka.copycat.storage" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="file">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
<!-- for tests -->
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="org.powermock" />
|
||||
</subpackage>
|
||||
|
||||
</subpackage>
|
||||
|
||||
</import-control>
|
|
@ -84,33 +84,6 @@ public class JsonConverter implements Converter {
|
|||
return value.textValue();
|
||||
}
|
||||
});
|
||||
TO_COPYCAT_CONVERTERS.put(JsonSchema.OBJECT_TYPE_NAME, new JsonToCopycatTypeConverter() {
|
||||
@Override
|
||||
public Object convert(JsonNode jsonSchema, JsonNode value) {
|
||||
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
|
||||
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
|
||||
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
|
||||
|
||||
HashMap<String, JsonNode> jsonSchemaFieldsByName = new HashMap<>();
|
||||
for (JsonNode fieldSchema : jsonSchemaFields) {
|
||||
JsonNode name = fieldSchema.get("name");
|
||||
if (name == null || !name.isTextual())
|
||||
throw new CopycatRuntimeException("Invalid field name");
|
||||
jsonSchemaFieldsByName.put(name.textValue(), fieldSchema);
|
||||
}
|
||||
|
||||
Schema schema = asCopycatSchema(jsonSchema);
|
||||
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
|
||||
// TODO: We should verify both the schema fields and actual fields are exactly identical
|
||||
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
|
||||
while (fieldIt.hasNext()) {
|
||||
Map.Entry<String, JsonNode> entry = fieldIt.next();
|
||||
JsonNode fieldSchema = jsonSchemaFieldsByName.get(entry.getKey());
|
||||
builder.set(entry.getKey(), convertToCopycat(fieldSchema, entry.getValue()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
});
|
||||
TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() {
|
||||
@Override
|
||||
public Object convert(JsonNode jsonSchema, JsonNode value) {
|
||||
|
@ -165,14 +138,6 @@ public class JsonConverter implements Converter {
|
|||
throw new UnsupportedOperationException("null schema not supported");
|
||||
case STRING:
|
||||
return JsonSchema.STRING_SCHEMA;
|
||||
case RECORD: {
|
||||
ObjectNode recordSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
|
||||
ArrayNode fields = recordSchema.putArray(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
fields.add(JsonNodeFactory.instance.objectNode().set(field.name(), asJsonSchema(field.schema())));
|
||||
}
|
||||
return recordSchema;
|
||||
}
|
||||
case UNION: {
|
||||
throw new UnsupportedOperationException("union schema not supported");
|
||||
}
|
||||
|
@ -181,8 +146,6 @@ public class JsonConverter implements Converter {
|
|||
.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType()));
|
||||
case ENUM:
|
||||
throw new UnsupportedOperationException("enum schema not supported");
|
||||
case FIXED:
|
||||
throw new UnsupportedOperationException("fixed schema not supported");
|
||||
case MAP:
|
||||
throw new UnsupportedOperationException("map schema not supported");
|
||||
default:
|
||||
|
@ -221,25 +184,6 @@ public class JsonConverter implements Converter {
|
|||
if (elemSchema == null)
|
||||
throw new CopycatRuntimeException("Array schema did not specify the element type");
|
||||
return Schema.createArray(asCopycatSchema(elemSchema));
|
||||
case JsonSchema.OBJECT_TYPE_NAME:
|
||||
JsonNode jsonSchemaName = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
|
||||
if (jsonSchemaName == null || !jsonSchemaName.isTextual())
|
||||
throw new CopycatRuntimeException("Invalid object schema, should contain name.");
|
||||
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
|
||||
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
|
||||
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
|
||||
List<Schema.Field> fields = new ArrayList<>();
|
||||
// TODO: We should verify both the schema fields and actual fields are exactly identical
|
||||
for (JsonNode fieldJsonSchema : jsonSchemaFields) {
|
||||
JsonNode fieldName = fieldJsonSchema.get(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME);
|
||||
if (fieldName == null || !fieldName.isTextual())
|
||||
throw new CopycatRuntimeException("Object field missing name");
|
||||
// TODO: doc, default value?
|
||||
fields.add(new Schema.Field(fieldName.textValue(), asCopycatSchema(fieldJsonSchema), null, null));
|
||||
}
|
||||
Schema result = Schema.createRecord(jsonSchemaName.textValue(), null, null, false);
|
||||
result.setFields(fields);
|
||||
return result;
|
||||
default:
|
||||
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue());
|
||||
}
|
||||
|
@ -283,25 +227,6 @@ public class JsonConverter implements Converter {
|
|||
return JsonSchema.bytesEnvelope(((ByteBuffer) value).array());
|
||||
} else if (value instanceof CharSequence) {
|
||||
return JsonSchema.stringEnvelope(value.toString());
|
||||
} else if (value instanceof GenericRecord) {
|
||||
GenericRecord recordValue = (GenericRecord) value;
|
||||
ObjectNode schema = JsonNodeFactory.instance.objectNode();
|
||||
schema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
|
||||
schema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, recordValue.getSchema().getName());
|
||||
ArrayNode schemaFields = JsonNodeFactory.instance.arrayNode();
|
||||
schema.set(JsonSchema.OBJECT_FIELDS_FIELD_NAME, schemaFields);
|
||||
ObjectNode record = JsonNodeFactory.instance.objectNode();
|
||||
for (Schema.Field field : recordValue.getSchema().getFields()) {
|
||||
JsonSchema.Envelope fieldSchemaAndValue = convertToJson(recordValue.get(field.name()));
|
||||
// Fill in the field name since this is part of the field schema spec but the call to convertToJson that
|
||||
// created it does not have access to the field name. This *must* copy the schema since it may be one of
|
||||
// the primitive schemas.
|
||||
ObjectNode fieldSchema = ((ObjectNode) fieldSchemaAndValue.schema).deepCopy();
|
||||
fieldSchema.put(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME, field.name());
|
||||
schemaFields.add(fieldSchema);
|
||||
record.set(field.name(), fieldSchemaAndValue.payload);
|
||||
}
|
||||
return new JsonSchema.Envelope(schema, record);
|
||||
} else if (value instanceof Collection) {
|
||||
Collection collection = (Collection) value;
|
||||
ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
|
||||
|
|
|
@ -29,8 +29,6 @@ public class JsonSchema {
|
|||
static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
|
||||
static final String SCHEMA_TYPE_FIELD_NAME = "type";
|
||||
static final String SCHEMA_NAME_FIELD_NAME = "name";
|
||||
static final String OBJECT_FIELD_NAME_FIELD_NAME = "name";
|
||||
static final String OBJECT_FIELDS_FIELD_NAME = "fields";
|
||||
static final String ARRAY_ITEMS_FIELD_NAME = "items";
|
||||
static final String BOOLEAN_TYPE_NAME = "boolean";
|
||||
static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
|
||||
|
@ -47,7 +45,6 @@ public class JsonSchema {
|
|||
static final String STRING_TYPE_NAME = "string";
|
||||
static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
|
||||
static final String ARRAY_TYPE_NAME = "array";
|
||||
static final String OBJECT_TYPE_NAME = "object";
|
||||
|
||||
public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
|
||||
ObjectNode result = JsonNodeFactory.instance.objectNode();
|
||||
|
|
|
@ -20,10 +20,6 @@ package org.apache.kafka.copycat.json;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import org.apache.kafka.copycat.data.GenericRecord;
|
||||
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -88,21 +84,6 @@ public class JsonConverterTest {
|
|||
assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void objectToCopycat() {
|
||||
JsonNode objectJson = parse("{ \"schema\": { \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }" +
|
||||
", \"payload\": { \"first\": 15, \"second\": \"foobar\" } }");
|
||||
Schema schema = SchemaBuilder.record("record").fields()
|
||||
.requiredInt("first")
|
||||
.requiredString("second")
|
||||
.endRecord();
|
||||
GenericRecord record = new GenericRecordBuilder(schema)
|
||||
.set("first", 15)
|
||||
.set("second", "foobar")
|
||||
.build();
|
||||
assertEquals(record, converter.toCopycatData(objectJson));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void booleanToJson() {
|
||||
|
@ -171,25 +152,6 @@ public class JsonConverterTest {
|
|||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void objectToJson() {
|
||||
Schema schema = SchemaBuilder.record("record").fields()
|
||||
.requiredInt("first")
|
||||
.requiredString("second")
|
||||
.endRecord();
|
||||
GenericRecord record = new GenericRecordBuilder(schema)
|
||||
.set("first", 15)
|
||||
.set("second", "foobar")
|
||||
.build();
|
||||
|
||||
JsonNode converted = converter.fromCopycatData(record);
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(parse("{ \"first\": 15, \"second\": \"foobar\" }"),
|
||||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
|
||||
private JsonNode parse(String json) {
|
||||
try {
|
||||
|
|
|
@ -21,10 +21,6 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.data.GenericRecord;
|
||||
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.sink.SinkTaskContext;
|
||||
|
@ -129,8 +125,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Test
|
||||
public void testDeliverConvertsData() throws Exception {
|
||||
// Validate conversion is performed when data is delivered
|
||||
Schema schema = SchemaBuilder.record("sample").fields().endRecord();
|
||||
GenericRecord record = new GenericRecordBuilder(schema).build();
|
||||
Integer record = 12;
|
||||
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
|
||||
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
||||
|
|
|
@ -23,10 +23,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
|
|||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.data.GenericRecord;
|
||||
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.source.SourceTaskContext;
|
||||
|
@ -59,8 +55,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
private static final byte[] STREAM_BYTES = "stream".getBytes();
|
||||
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
||||
|
||||
private static final Schema RECORD_SCHEMA = SchemaBuilder.record("sample").fields().endRecord();
|
||||
private static final GenericRecord RECORD = new GenericRecordBuilder(RECORD_SCHEMA).build();
|
||||
private static final Integer RECORD = 12;
|
||||
// The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
// is used in the right place.
|
||||
private static final String CONVERTED_RECORD = "converted-record";
|
||||
|
|
Loading…
Reference in New Issue