KAFKA-1863; Add docs for possible thrown exception in Callback; reviewed by Jiangjie Qin

This commit is contained in:
Guozhang Wang 2015-03-13 15:17:08 -07:00
parent c41c7b40b6
commit 1caaf6db40
6 changed files with 62 additions and 2 deletions

View File

@ -29,6 +29,25 @@ public interface Callback {
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
*
* Non-Retriable exceptions (fatal, the message will never be sent):
*
* InvalidTopicException
* OffsetMetadataTooLargeException
* RecordBatchTooLargeException
* RecordTooLargeException
* UnknownServerException
*
* Retriable exceptions (transient, may be covered by increasing #.retries):
*
* CorruptRecordException
* InvalidMetadataException
* NotEnoughReplicasAfterAppendException
* NotEnoughReplicasException
* OffsetOutOfRangeException
* TimeoutException
* UnknownTopicOrPartitionException
*/
public void onCompletion(RecordMetadata metadata, Exception exception);
}

View File

@ -30,6 +30,11 @@ public class Field {
public final String doc;
final Schema schema;
/**
* Create the field.
*
* @throws SchemaException If the default value is not primitive and the validation fails
*/
public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
this.index = index;
this.name = name;

View File

@ -24,6 +24,11 @@ public class Schema extends Type {
private final Field[] fields;
private final Map<String, Field> fieldsByName;
/**
* Construct the schema with a given list of its field values
*
* @throws SchemaException If the given list have duplicate fields
*/
public Schema(Field... fs) {
this.fields = new Field[fs.length];
this.fieldsByName = new HashMap<String, Field>();

View File

@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types;
import org.apache.kafka.common.KafkaException;
/**
* Thrown if the protocol schema validation fails while parsing request or response.
*/
public class SchemaException extends KafkaException {
private static final long serialVersionUID = 1L;

View File

@ -60,6 +60,7 @@ public class Struct {
*
* @param field The field to look up
* @return The value for that field.
* @throws SchemaException if the field has no value and has no default.
*/
public Object get(Field field) {
validateField(field);
@ -71,6 +72,7 @@ public class Struct {
*
* @param name The name of the field
* @return The value in the record
* @throws SchemaException If no such field exists
*/
public Object get(String name) {
Field field = schema.get(name);
@ -149,6 +151,7 @@ public class Struct {
*
* @param field The field
* @param value The value
* @throws SchemaException If the validation of the field failed
*/
public Struct set(Field field, Object value) {
validateField(field);
@ -161,6 +164,7 @@ public class Struct {
*
* @param name The name of the field
* @param value The value to set
* @throws SchemaException If the field is not known
*/
public Struct set(String name, Object value) {
Field field = this.schema.get(name);
@ -177,6 +181,7 @@ public class Struct {
*
* @param field The field to create an instance of
* @return The struct
* @throws SchemaException If the given field is not a container type
*/
public Struct instance(Field field) {
validateField(field);
@ -195,6 +200,7 @@ public class Struct {
*
* @param field The name of the field to create (field must be a schema type)
* @return The struct
* @throws SchemaException If the given field is not a container type
*/
public Struct instance(String field) {
return instance(schema.get(field));
@ -223,6 +229,8 @@ public class Struct {
/**
* Ensure the user doesn't try to access fields from the wrong schema
*
* @throws SchemaException If validation fails
*/
private void validateField(Field field) {
if (this.schema != field.schema)
@ -233,6 +241,8 @@ public class Struct {
/**
* Validate the contents of this struct against its schema
*
* @throws SchemaException If validation fails
*/
public void validate() {
this.schema.validate(this);

View File

@ -25,14 +25,32 @@ import org.apache.kafka.common.utils.Utils;
*/
public abstract class Type {
/**
* Write the typed object to the buffer
*
* @throws SchemaException If the object is not valid for its type
*/
public abstract void write(ByteBuffer buffer, Object o);
/**
* Read the typed object from the buffer
*
* @throws SchemaException If the object is not valid for its type
*/
public abstract Object read(ByteBuffer buffer);
public abstract int sizeOf(Object o);
/**
* Validate the object. If succeeded return its typed object.
*
* @throws SchemaException If validation failed
*/
public abstract Object validate(Object o);
/**
* Return the size of the object in bytes
*/
public abstract int sizeOf(Object o);
public static final Type INT8 = new Type() {
@Override
public void write(ByteBuffer buffer, Object o) {