kafka-1797; (follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Jay Kreps

This commit is contained in:
Jun Rao 2015-01-06 11:07:46 -08:00
parent 4471dc08b6
commit 50b734690a
24 changed files with 370 additions and 99 deletions

View File

@ -371,6 +371,7 @@ project(':clients') {
javadoc { javadoc {
include "**/org/apache/kafka/clients/producer/*" include "**/org/apache/kafka/clients/producer/*"
include "**/org/apache/kafka/common/errors/*" include "**/org/apache/kafka/common/errors/*"
include "**/org/apache/kafka/common/serialization/*"
} }
} }

View File

@ -185,8 +185,8 @@ public class ConsumerConfig extends AbstractConfig {
METRICS_SAMPLE_WINDOW_MS_DOC) METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC); .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
} }

View File

@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -345,7 +346,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* @param configs The consumer configs * @param configs The consumer configs
*/ */
public KafkaConsumer(Map<String, Object> configs) { public KafkaConsumer(Map<String, Object> configs) {
this(new ConsumerConfig(configs), null, null, null); this(configs, null);
} }
/** /**
@ -358,7 +359,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* every rebalance operation. * every rebalance operation.
*/ */
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) { public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
this(new ConsumerConfig(configs), callback, null, null); this(configs, callback, null, null);
} }
/** /**
@ -375,7 +376,19 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* won't be called in the consumer when the deserializer is passed in directly. * won't be called in the consumer when the deserializer is passed in directly.
*/ */
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer); this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
callback, keyDeserializer, valueDeserializer);
}
private static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keyDeserializer != null)
newConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
if (keyDeserializer != null)
newConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
return newConfigs;
} }
/** /**
@ -383,7 +396,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* Valid configuration strings are documented at {@link ConsumerConfig} * Valid configuration strings are documented at {@link ConsumerConfig}
*/ */
public KafkaConsumer(Properties properties) { public KafkaConsumer(Properties properties) {
this(new ConsumerConfig(properties), null, null, null); this(properties, null);
} }
/** /**
@ -396,7 +409,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* every rebalance operation. * every rebalance operation.
*/ */
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
this(new ConsumerConfig(properties), callback, null, null); this(properties, callback, null, null);
} }
/** /**
@ -413,7 +426,19 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* won't be called in the consumer when the deserializer is passed in directly. * won't be called in the consumer when the deserializer is passed in directly.
*/ */
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer); this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
callback, keyDeserializer, valueDeserializer);
}
private static Properties addDeserializerToConfig(Properties properties,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null)
newProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
if (keyDeserializer != null)
newProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
return newProperties;
} }
private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {

View File

@ -13,10 +13,7 @@
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
@ -44,6 +42,7 @@ import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.SystemTime;
@ -102,7 +101,19 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* be called in the producer when the serializer is passed in directly. * be called in the producer when the serializer is passed in directly.
*/ */
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(configs), keySerializer, valueSerializer); this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
private static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keySerializer != null)
newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
if (valueSerializer != null)
newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
return newConfigs;
} }
/** /**
@ -124,7 +135,19 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* be called in the producer when the serializer is passed in directly. * be called in the producer when the serializer is passed in directly.
*/ */
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) { public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(properties), keySerializer, valueSerializer); this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
private static Properties addSerializerToConfig(Properties properties,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keySerializer != null)
newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
if (valueSerializer != null)
newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
return newProperties;
} }
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
@ -178,14 +201,18 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
this.errors = this.metrics.sensor("errors"); this.errors = this.metrics.sensor("errors");
if (keySerializer == null) if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class); Serializer.class);
this.keySerializer.configure(config.originals(), true);
}
else else
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
if (valueSerializer == null) if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class); Serializer.class);
this.valueSerializer.configure(config.originals(), false);
}
else else
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
@ -275,8 +302,20 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
try { try {
// first make sure the metadata for the topic is available // first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true); byte[] serializedKey;
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false); try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to the one specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to the one specified in value.serializer");
}
ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
int partition = partitioner.partition(serializedRecord, metadata.fetch()); int partition = partitioner.partition(serializedRecord, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

View File

@ -229,8 +229,8 @@ public class ProducerConfig extends AbstractConfig {
atLeast(1), atLeast(1),
Importance.LOW, Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
} }
ProducerConfig(Map<? extends Object, ? extends Object> props) { ProducerConfig(Map<? extends Object, ? extends Object> props) {

View File

@ -12,12 +12,7 @@
*/ */
package org.apache.kafka.common.config; package org.apache.kafka.common.config;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -97,6 +92,12 @@ public class AbstractConfig {
return keys; return keys;
} }
public Map<String, ?> originals() {
Map<String, Object> copy = new HashMap<String, Object>();
copy.putAll(originals);
return copy;
}
private void logAll() { private void logAll() {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName()); b.append(getClass().getSimpleName());

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.KafkaException;
/**
* Any exception during deserialization in the consumer
*/
public class DeserializationException extends KafkaException {
private static final long serialVersionUID = 1L;
public DeserializationException(String message, Throwable cause) {
super(message, cause);
}
public DeserializationException(String message) {
super(message);
}
public DeserializationException(Throwable cause) {
super(cause);
}
public DeserializationException() {
super();
}
/* avoid the expensive and useless stack trace for deserialization exceptions */
@Override
public Throwable fillInStackTrace() {
return this;
}
}

View File

@ -11,19 +11,19 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.common.serialization;
import java.util.Map; import java.util.Map;
public class ByteArrayDeserializer implements Deserializer<byte[]> { public class ByteArrayDeserializer implements Deserializer<byte[]> {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do // nothing to do
} }
@Override @Override
public byte[] deserialize(String topic, byte[] data, boolean isKey) { public byte[] deserialize(String topic, byte[] data) {
return data; return data;
} }

View File

@ -11,19 +11,19 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.common.serialization;
import java.util.Map; import java.util.Map;
public class ByteArraySerializer implements Serializer<byte[]> { public class ByteArraySerializer implements Serializer<byte[]> {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do // nothing to do
} }
@Override @Override
public byte[] serialize(String topic, byte[] data, boolean isKey) { public byte[] serialize(String topic, byte[] data) {
return data; return data;
} }

View File

@ -11,9 +11,9 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.common.serialization;
import org.apache.kafka.common.Configurable; import java.util.Map;
/** /**
* *
@ -21,15 +21,22 @@ import org.apache.kafka.common.Configurable;
* *
* A class that implements this interface is expected to have a constructor with no parameter. * A class that implements this interface is expected to have a constructor with no parameter.
*/ */
public interface Deserializer<T> extends Configurable { public interface Deserializer<T> {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/** /**
* *
* @param topic Topic associated with the data * @param topic topic associated with the data
* @param data Serialized bytes * @param data serialized bytes
* @param isKey Is data for key or value
* @return deserialized typed data * @return deserialized typed data
*/ */
public T deserialize(String topic, byte[] data, boolean isKey); public T deserialize(String topic, byte[] data);
/** /**
* Close this deserializer * Close this deserializer

View File

@ -11,9 +11,9 @@
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.common.serialization;
import org.apache.kafka.common.Configurable; import java.util.Map;
/** /**
* *
@ -21,15 +21,21 @@ import org.apache.kafka.common.Configurable;
* *
* A class that implements this interface is expected to have a constructor with no parameter. * A class that implements this interface is expected to have a constructor with no parameter.
*/ */
public interface Serializer<T> extends Configurable { public interface Serializer<T> {
/** /**
* * Configure this class.
* @param topic Topic associated with data * @param configs configs in key/value pairs
* @param data Typed data * @param isKey whether is for key or value
* @param isKey Is data for key or value
* @return bytes of the serialized data
*/ */
public byte[] serialize(String topic, T data, boolean isKey); public void configure(Map<String, ?> configs, boolean isKey);
/**
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
public byte[] serialize(String topic, T data);
/** /**
* Close this serializer * Close this serializer

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public String deserialize(String topic, byte[] data) {
try {
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class SerializationTest {
private static class SerDeser<T> {
final Serializer<T> serializer;
final Deserializer<T> deserializer;
public SerDeser(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
}
@Test
public void testStringSerializer() {
String str = "my string";
String mytopic = "testTopic";
List<String> encodings = new ArrayList<String>();
encodings.add("UTF8");
encodings.add("UTF-16");
for ( String encoding : encodings) {
SerDeser<String> serDeser = getStringSerDeser(encoding);
Serializer<String> serializer = serDeser.serializer;
Deserializer<String> deserializer = serDeser.deserializer;
assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding,
str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str)));
}
}
private SerDeser<String> getStringSerDeser(String encoder) {
Map<String, Object> serializerConfigs = new HashMap<String, Object>();
serializerConfigs.put("key.serializer.encoding", encoder);
Serializer<String> serializer = new StringSerializer();
serializer.configure(serializerConfigs, true);
Map<String, Object> deserializerConfigs = new HashMap<String, Object>();
deserializerConfigs.put("key.deserializer.encoding", encoder);
Deserializer<String> deserializer = new StringDeserializer();
deserializer.configure(deserializerConfigs, true);
return new SerDeser<String>(serializer, deserializer);
}
}

View File

@ -60,6 +60,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
throw new MissingConfigException("topic must be specified by the Kafka log4j appender") throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producer = new KafkaProducer[Array[Byte],Array[Byte]](props) producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Kafka producer connected to " + brokerList)
LogLog.debug("Logging for topic: " + topic) LogLog.debug("Logging for topic: " + topic)

View File

@ -59,6 +59,8 @@ object ConsoleProducer {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(props) new NewShinyProducer(props)
} else { } else {

View File

@ -27,9 +27,6 @@ import kafka.producer.{KeyedMessage, ProducerConfig}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord}
import org.apache.kafka.common.KafkaException
import scala.collection.JavaConversions._
import joptsimple.OptionParser import joptsimple.OptionParser
import java.util.Properties import java.util.Properties
@ -240,8 +237,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
producerThreads = (0 until numProducers).map(i => { producerThreads = (0 until numProducers).map(i => {
producerProps.setProperty("client.id", clientId + "-" + i) producerProps.setProperty("client.id", clientId + "-" + i)
val producer = val producer =
if (useNewProducer) if (useNewProducer) {
producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
new MirrorMakerNewProducer(producerProps) new MirrorMakerNewProducer(producerProps)
}
else else
new MirrorMakerOldProducer(producerProps) new MirrorMakerOldProducer(producerProps)
new ProducerThread(mirrorDataChannel, producer, i) new ProducerThread(mirrorDataChannel, producer, i)

View File

@ -188,6 +188,8 @@ object ProducerPerformance extends Logging {
props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(props) new NewShinyProducer(props)
} else { } else {
props.put("metadata.broker.list", config.brokerList) props.put("metadata.broker.list", config.brokerList)

View File

@ -120,6 +120,8 @@ object ReplayLogProducer extends Logging {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
} }
class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {

View File

@ -56,6 +56,8 @@ object TestEndToEndLatency {
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
// make sure the consumer fetcher has started before sending data since otherwise // make sure the consumer fetcher has started before sending data since otherwise

View File

@ -242,6 +242,8 @@ object TestLogCleaning {
val producerProps = new Properties val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
val rand = new Random(1) val rand = new Random(1)
val keyCount = (messages / dups).toInt val keyCount = (messages / dups).toInt

View File

@ -75,6 +75,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200") props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")

View File

@ -30,6 +30,10 @@ import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder import kafka.api.FetchRequestBuilder
import kafka.message.Message import kafka.message.Message
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.common.errors.SerializationException
import java.util.Properties
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.serialization.ByteArraySerializer
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
@ -126,6 +130,55 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
} }
} }
@Test
def testSerializer() {
// send a record with a wrong type should receive a serialization exception
try {
val producer = createNewProducerWithWrongSerializer(brokerList)
val record5 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
producer.send(record5)
fail("Should have gotten a SerializationException")
} catch {
case se: SerializationException => // this is ok
}
try {
createNewProducerWithNoSerializer(brokerList)
fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
} catch {
case ce : ConfigException => // this is ok
}
// create a producer with explicit serializers should succeed
createNewProducerWithExplicitSerializer(brokerList)
}
private def createNewProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
import org.apache.kafka.clients.producer.ProducerConfig
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
}
private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
import org.apache.kafka.clients.producer.ProducerConfig
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
}
private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
import org.apache.kafka.clients.producer.ProducerConfig
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
}
/** /**
* testClose checks the closing behavior * testClose checks the closing behavior
* *

View File

@ -395,6 +395,8 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
} }