From 98713570867619de7de8e885a82d5fb0bfdb3b4e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 15 Apr 2018 20:51:53 -0700 Subject: [PATCH] KAFKA-6592: Follow-up (#4864) Do not require ConsoleConsumer to specify inner serde as s special property, but just a normal property of the message formatter. --- .../apache/kafka/test/MockDeserializer.java | 5 ++ .../scala/kafka/tools/ConsoleConsumer.scala | 50 ++++++++----------- .../kafka/tools/ConsoleConsumerTest.scala | 16 +++--- .../KStreamAggregationIntegrationTest.java | 2 +- 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java index 99551f718a9..ac2865e9bb8 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java @@ -31,6 +31,9 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId); + public boolean isKey; + public Map configs; + public static void resetStaticVariables() { initCount = new AtomicInteger(0); closeCount = new AtomicInteger(0); @@ -44,6 +47,8 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer configs, boolean isKey) { + this.configs = configs; + this.isKey = isKey; } @Override diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 9df4fb42070..5139324ec17 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -19,8 +19,9 @@ package kafka.tools import java.io.PrintStream import java.nio.charset.StandardCharsets +import java.util import java.util.concurrent.CountDownLatch -import java.util.{Locale, Properties, Random} +import java.util.{Locale, Map, Properties, Random} import com.typesafe.scalalogging.LazyLogging import joptsimple._ @@ -46,11 +47,6 @@ import scala.collection.JavaConverters._ object ConsoleConsumer extends Logging { var messageCount = 0 - // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS - // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - // visible for testing - private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner" - private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner" private val shutdownLatch = new CountDownLatch(1) @@ -306,8 +302,8 @@ object ConsoleConsumer extends Logging { "\tline.separator=\n" + "\tkey.deserializer=\n" + "\tvalue.deserializer=\n" + - "\tdefault.windowed.key.serde.inner=\n" + - "\tdefault.windowed.value.serde.inner=") + "\nUsers can also pass in customized properties for their formatter; more specifically, users " + + "can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.") .withRequiredArg .describedAs("prop") .ofType(classOf[String]) @@ -344,18 +340,6 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) - val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName, - "inner serde for key when windowed deserialzier is used; would be ignored otherwise. " + - "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde") - .withRequiredArg - .describedAs("inner serde for key") - .ofType(classOf[String]) - val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName, - "inner serde for value when windowed deserialzier is used; would be ignored otherwise. " + - "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde") - .withRequiredArg - .describedAs("inner serde for values") - .ofType(classOf[String]) val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", "Log lifecycle events of the consumer in addition to logging consumed " + "messages. (This is specific for system tests.)") @@ -400,8 +384,6 @@ object ConsoleConsumer extends Logging { val bootstrapServer = options.valueOf(bootstrapServerOpt) val keyDeserializer = options.valueOf(keyDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt) - val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt) - val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt) val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] @@ -411,12 +393,6 @@ object ConsoleConsumer extends Logging { if (valueDeserializer != null && !valueDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) } - if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) { - formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer) - } - if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) { - formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer) - } formatter.init(formatterArgs) @@ -560,15 +536,29 @@ class DefaultMessageFormatter extends MessageFormatter { // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` if (props.containsKey("key.deserializer")) { keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true) + keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("key.deserializer.", props)).asJava, true) } // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` if (props.containsKey("value.deserializer")) { valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false) + valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("value.deserializer.", props)).asJava, false) } } + def stripWithPrefix(prefix: String, props: Properties): Properties = { + val newProps = new Properties() + import scala.collection.JavaConversions._ + for (entry <- props) { + val key: String = entry._1 + val value: String = entry._2 + + if (key.startsWith(prefix) && key.length > prefix.length) + newProps.put(key.substring(prefix.length), value) + } + + newProps + } + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { def writeSeparator(columnSeparator: Boolean): Unit = { diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index f5195c3cf76..6f465557d7b 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -25,7 +25,7 @@ import kafka.utils.{Exit, TestUtils} import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy} import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.test.MockDeserializer import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{Before, Test} @@ -545,15 +545,17 @@ class ConsoleConsumerTest { "--bootstrap-server", "localhost:9092", "--topic", "test", "--property", "print.key=true", - "--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer", - "--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde", - "--property", "my-test1=abc" + "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer", + "--property", "key.deserializer.my-props=abc" ) val config = new ConsoleConsumer.ConsumerConfig(args) assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter]) + assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props")) val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter] - assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer]) - assertTrue(config.formatterArgs.containsKey("my-test1")) - assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName)) + assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer]) + assertEquals(1, formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size) + assertEquals("abc", formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props")) + assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index fc673d0258e..52b9ee80915 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -781,7 +781,7 @@ public class KStreamAggregationIntegrationTest { "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), "--property", "key.separator=" + keySeparator, - "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName() + "--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + Serdes.serdeFrom(innerClass).getClass().getName() }; ConsoleConsumer.messageCount_$eq(0); //reset the message count