mirror of https://github.com/apache/kafka.git
KAFKA-6592: Follow-up (#4864)
Do not require ConsoleConsumer to specify inner serde as s special property, but just a normal property of the message formatter.
This commit is contained in:
parent
0dc7f0e66f
commit
9871357086
|
@ -31,6 +31,9 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
|
||||||
public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
|
public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
|
||||||
public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
|
public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
|
||||||
|
|
||||||
|
public boolean isKey;
|
||||||
|
public Map<String, ?> configs;
|
||||||
|
|
||||||
public static void resetStaticVariables() {
|
public static void resetStaticVariables() {
|
||||||
initCount = new AtomicInteger(0);
|
initCount = new AtomicInteger(0);
|
||||||
closeCount = new AtomicInteger(0);
|
closeCount = new AtomicInteger(0);
|
||||||
|
@ -44,6 +47,8 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||||
|
this.configs = configs;
|
||||||
|
this.isKey = isKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,8 +19,9 @@ package kafka.tools
|
||||||
|
|
||||||
import java.io.PrintStream
|
import java.io.PrintStream
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
|
import java.util
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
import java.util.{Locale, Properties, Random}
|
import java.util.{Locale, Map, Properties, Random}
|
||||||
|
|
||||||
import com.typesafe.scalalogging.LazyLogging
|
import com.typesafe.scalalogging.LazyLogging
|
||||||
import joptsimple._
|
import joptsimple._
|
||||||
|
@ -46,11 +47,6 @@ import scala.collection.JavaConverters._
|
||||||
object ConsoleConsumer extends Logging {
|
object ConsoleConsumer extends Logging {
|
||||||
|
|
||||||
var messageCount = 0
|
var messageCount = 0
|
||||||
// Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
|
|
||||||
// and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
|
|
||||||
// visible for testing
|
|
||||||
private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
|
|
||||||
private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
|
|
||||||
|
|
||||||
private val shutdownLatch = new CountDownLatch(1)
|
private val shutdownLatch = new CountDownLatch(1)
|
||||||
|
|
||||||
|
@ -306,8 +302,8 @@ object ConsoleConsumer extends Logging {
|
||||||
"\tline.separator=<line.separator>\n" +
|
"\tline.separator=<line.separator>\n" +
|
||||||
"\tkey.deserializer=<key.deserializer>\n" +
|
"\tkey.deserializer=<key.deserializer>\n" +
|
||||||
"\tvalue.deserializer=<value.deserializer>\n" +
|
"\tvalue.deserializer=<value.deserializer>\n" +
|
||||||
"\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
|
"\nUsers can also pass in customized properties for their formatter; more specifically, users " +
|
||||||
"\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
|
"can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("prop")
|
.describedAs("prop")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
|
@ -344,18 +340,6 @@ object ConsoleConsumer extends Logging {
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("deserializer for values")
|
.describedAs("deserializer for values")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
|
|
||||||
"inner serde for key when windowed deserialzier is used; would be ignored otherwise. " +
|
|
||||||
"For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
|
|
||||||
.withRequiredArg
|
|
||||||
.describedAs("inner serde for key")
|
|
||||||
.ofType(classOf[String])
|
|
||||||
val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
|
|
||||||
"inner serde for value when windowed deserialzier is used; would be ignored otherwise. " +
|
|
||||||
"For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
|
|
||||||
.withRequiredArg
|
|
||||||
.describedAs("inner serde for values")
|
|
||||||
.ofType(classOf[String])
|
|
||||||
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
|
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
|
||||||
"Log lifecycle events of the consumer in addition to logging consumed " +
|
"Log lifecycle events of the consumer in addition to logging consumed " +
|
||||||
"messages. (This is specific for system tests.)")
|
"messages. (This is specific for system tests.)")
|
||||||
|
@ -400,8 +384,6 @@ object ConsoleConsumer extends Logging {
|
||||||
val bootstrapServer = options.valueOf(bootstrapServerOpt)
|
val bootstrapServer = options.valueOf(bootstrapServerOpt)
|
||||||
val keyDeserializer = options.valueOf(keyDeserializerOpt)
|
val keyDeserializer = options.valueOf(keyDeserializerOpt)
|
||||||
val valueDeserializer = options.valueOf(valueDeserializerOpt)
|
val valueDeserializer = options.valueOf(valueDeserializerOpt)
|
||||||
val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
|
|
||||||
val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
|
|
||||||
val isolationLevel = options.valueOf(isolationLevelOpt).toString
|
val isolationLevel = options.valueOf(isolationLevelOpt).toString
|
||||||
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
|
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
|
||||||
|
|
||||||
|
@ -411,12 +393,6 @@ object ConsoleConsumer extends Logging {
|
||||||
if (valueDeserializer != null && !valueDeserializer.isEmpty) {
|
if (valueDeserializer != null && !valueDeserializer.isEmpty) {
|
||||||
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
|
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
|
||||||
}
|
}
|
||||||
if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
|
|
||||||
formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
|
|
||||||
}
|
|
||||||
if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
|
|
||||||
formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
|
|
||||||
}
|
|
||||||
|
|
||||||
formatter.init(formatterArgs)
|
formatter.init(formatterArgs)
|
||||||
|
|
||||||
|
@ -560,15 +536,29 @@ class DefaultMessageFormatter extends MessageFormatter {
|
||||||
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
||||||
if (props.containsKey("key.deserializer")) {
|
if (props.containsKey("key.deserializer")) {
|
||||||
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
||||||
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true)
|
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("key.deserializer.", props)).asJava, true)
|
||||||
}
|
}
|
||||||
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
||||||
if (props.containsKey("value.deserializer")) {
|
if (props.containsKey("value.deserializer")) {
|
||||||
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
||||||
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false)
|
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("value.deserializer.", props)).asJava, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def stripWithPrefix(prefix: String, props: Properties): Properties = {
|
||||||
|
val newProps = new Properties()
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
for (entry <- props) {
|
||||||
|
val key: String = entry._1
|
||||||
|
val value: String = entry._2
|
||||||
|
|
||||||
|
if (key.startsWith(prefix) && key.length > prefix.length)
|
||||||
|
newProps.put(key.substring(prefix.length), value)
|
||||||
|
}
|
||||||
|
|
||||||
|
newProps
|
||||||
|
}
|
||||||
|
|
||||||
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
|
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
|
||||||
|
|
||||||
def writeSeparator(columnSeparator: Boolean): Unit = {
|
def writeSeparator(columnSeparator: Boolean): Unit = {
|
||||||
|
|
|
@ -25,7 +25,7 @@ import kafka.utils.{Exit, TestUtils}
|
||||||
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
|
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
import org.apache.kafka.test.MockDeserializer
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
|
@ -545,15 +545,17 @@ class ConsoleConsumerTest {
|
||||||
"--bootstrap-server", "localhost:9092",
|
"--bootstrap-server", "localhost:9092",
|
||||||
"--topic", "test",
|
"--topic", "test",
|
||||||
"--property", "print.key=true",
|
"--property", "print.key=true",
|
||||||
"--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
|
"--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
|
||||||
"--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde",
|
"--property", "key.deserializer.my-props=abc"
|
||||||
"--property", "my-test1=abc"
|
|
||||||
)
|
)
|
||||||
val config = new ConsoleConsumer.ConsumerConfig(args)
|
val config = new ConsoleConsumer.ConsumerConfig(args)
|
||||||
assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
|
assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
|
||||||
|
assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
|
||||||
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
|
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
|
||||||
assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
|
assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
|
||||||
assertTrue(config.formatterArgs.containsKey("my-test1"))
|
assertEquals(1, formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
|
||||||
assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
|
assertEquals("abc", formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))
|
||||||
|
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -781,7 +781,7 @@ public class KStreamAggregationIntegrationTest {
|
||||||
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
|
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
|
||||||
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
|
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
|
||||||
"--property", "key.separator=" + keySeparator,
|
"--property", "key.separator=" + keySeparator,
|
||||||
"--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()
|
"--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName()
|
||||||
};
|
};
|
||||||
|
|
||||||
ConsoleConsumer.messageCount_$eq(0); //reset the message count
|
ConsoleConsumer.messageCount_$eq(0); //reset the message count
|
||||||
|
|
Loading…
Reference in New Issue