KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)

This PR includes 3 MessageFormatters for MirrorMaker2 internal topics:
- HeartbeatFormatter
- CheckpointFormatter
- OffsetSyncFormatter

This also introduces a new public interface org.apache.kafka.common.MessageFormatter that users can implement to build custom formatters.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>, David Jacot <djacot@confluent.io>

Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
This commit is contained in:
Mickael Maison 2020-07-03 10:41:45 +01:00 committed by GitHub
parent 3b2ae7b95a
commit caa806cd82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 258 additions and 37 deletions

View File

@ -48,6 +48,7 @@
<allow pkg="org.apache.kafka.common.memory" /> <allow pkg="org.apache.kafka.common.memory" />
<subpackage name="common"> <subpackage name="common">
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
<disallow pkg="org.apache.kafka.clients" /> <disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" /> <allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" /> <allow pkg="org.apache.kafka.common.annotation" />

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import java.io.Closeable;
import java.io.PrintStream;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* This interface allows to define Formatters that can be used to parse and format records read by a
* Consumer instance for display.
* The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag.
*
* Kafka provides a few implementations to display records of internal topics such as __consumer_offsets,
* __transaction_state and the MirrorMaker2 topics.
*
*/
public interface MessageFormatter extends Configurable, Closeable {
/**
* Initialises the MessageFormatter
* @param props Properties to configure the formatter
* @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
*/
@Deprecated
default public void init(Properties props) {}
/**
* Configures the MessageFormatter
* @param configs Map to configure the formatter
*/
default public void configure(Map<String, ?> configs) {
Properties properties = new Properties();
properties.putAll(configs);
init(properties);
}
/**
* Parses and formats a record for display
* @param consumerRecord the record to format
* @param output the print stream used to output the record
*/
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
/**
* Closes the formatter
*/
default public void close() {}
}

View File

@ -83,7 +83,7 @@ public class OffsetSync {
return buffer; return buffer;
} }
static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) { public static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key())); Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
String topic = keyStruct.getString(TOPIC_KEY); String topic = keyStruct.getString(TOPIC_KEY);
int partition = keyStruct.getInt(PARTITION_KEY); int partition = keyStruct.getInt(PARTITION_KEY);
@ -116,5 +116,5 @@ public class OffsetSync {
byte[] recordValue() { byte[] recordValue() {
return serializeValue().array(); return serializeValue().array();
} }
}; }

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror.formatters;
import java.io.PrintStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.connect.mirror.Checkpoint;
public class CheckpointFormatter implements MessageFormatter {
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
output.println(Checkpoint.deserializeRecord(record));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror.formatters;
import java.io.PrintStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.connect.mirror.Heartbeat;
public class HeartbeatFormatter implements MessageFormatter {
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
output.println(Heartbeat.deserializeRecord(record));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror.formatters;
import java.io.PrintStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.connect.mirror.OffsetSync;
public class OffsetSyncFormatter implements MessageFormatter {
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
output.println(OffsetSync.deserializeRecord(record));
}
}

View File

@ -17,23 +17,12 @@
package kafka.common package kafka.common
import java.io.PrintStream
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
/** /**
* Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
* a `PrintStream`. * a `PrintStream`.
* *
* This is used by the `ConsoleConsumer`. * This is used by the `ConsoleConsumer`.
*/ */
trait MessageFormatter { @deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.7.0")
trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
def init(props: Properties): Unit = {}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
def close(): Unit = {}
} }

View File

@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0} import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
import kafka.common.{MessageFormatter, OffsetAndMetadata} import kafka.common.OffsetAndMetadata
import kafka.log.AppendOrigin import kafka.log.AppendOrigin
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager} import kafka.server.{FetchLogEnd, ReplicaManager}
@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import scala.collection._ import scala.collection._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer

View File

@ -20,12 +20,11 @@ import java.io.PrintStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import kafka.common.MessageFormatter
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{CompressionType, Record, RecordBatch} import org.apache.kafka.common.record.{CompressionType, Record, RecordBatch}
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import scala.collection.mutable import scala.collection.mutable

View File

@ -22,15 +22,14 @@ import java.nio.charset.StandardCharsets
import java.time.Duration import java.time.Duration
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.{Collections, Locale, Properties, Random} import java.util.{Collections, Locale, Map, Properties, Random}
import com.typesafe.scalalogging.LazyLogging import com.typesafe.scalalogging.LazyLogging
import joptsimple._ import joptsimple._
import kafka.common.MessageFormatter
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.utils.{Exit, _} import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException} import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetRequest import org.apache.kafka.common.requests.ListOffsetRequest
@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
} }
formatter.init(formatterArgs) formatter.configure(formatterArgs.asScala.asJava)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1) if (topicOrFilterOpt.size != 1)
@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
var keyDeserializer: Option[Deserializer[_]] = None var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None
override def init(props: Properties): Unit = { override def configure(configs: Map[String, _]): Unit = {
val props = new java.util.Properties()
configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
if (props.containsKey("print.timestamp")) if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key")) if (props.containsKey("print.key"))
@ -548,7 +549,7 @@ class DefaultMessageFormatter extends MessageFormatter {
class LoggingMessageFormatter extends MessageFormatter with LazyLogging { class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
override def init(props: Properties): Unit = defaultWriter.init(props) override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs)
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
import consumerRecord._ import consumerRecord._
@ -560,7 +561,6 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
} }
class NoOpMessageFormatter extends MessageFormatter { class NoOpMessageFormatter extends MessageFormatter {
override def init(props: Properties): Unit = {}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {} def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
} }
@ -568,12 +568,11 @@ class NoOpMessageFormatter extends MessageFormatter {
class ChecksumMessageFormatter extends MessageFormatter { class ChecksumMessageFormatter extends MessageFormatter {
private var topicStr: String = _ private var topicStr: String = _
override def init(props: Properties): Unit = { override def configure(configs: Map[String, _]): Unit = {
topicStr = props.getProperty("topic") topicStr = if (configs.containsKey("topic"))
if (topicStr != null) configs.get("topic").toString + ":"
topicStr = topicStr + ":"
else else
topicStr = "" ""
} }
@nowarn("cat=deprecation") @nowarn("cat=deprecation")

View File

@ -17,14 +17,15 @@
package kafka.tools package kafka.tools
import java.io.PrintStream import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files import java.nio.file.Files
import java.util.{HashMap, Map => JMap}
import kafka.common.MessageFormatter
import kafka.tools.ConsoleConsumer.ConsumerWrapper import kafka.tools.ConsoleConsumer.ConsumerWrapper
import kafka.utils.{Exit, TestUtils} import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy} import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.test.MockDeserializer import org.apache.kafka.test.MockDeserializer
import org.mockito.Mockito._ import org.mockito.Mockito._
@ -495,4 +496,70 @@ class ConsoleConsumerTest {
Exit.resetExitProcedure() Exit.resetExitProcedure()
} }
} }
@Test
def testDefaultMessageFormatter(): Unit = {
val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
val formatter = new DefaultMessageFormatter()
val configs: JMap[String, String] = new HashMap()
formatter.configure(configs)
var out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("value\n", out.toString)
configs.put("print.key", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("key\tvalue\n", out.toString)
configs.put("print.partition", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("key\tvalue\t0\n", out.toString)
configs.put("print.timestamp", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString)
out = new ByteArrayOutputStream()
val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
formatter.writeTo(record2, new PrintStream(out))
assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString)
formatter.close()
}
@Test
def testNoOpMessageFormatter(): Unit = {
val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
val formatter = new NoOpMessageFormatter()
formatter.configure(new HashMap())
val out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("", out.toString)
}
@Test
def testChecksumMessageFormatter(): Unit = {
val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
val formatter = new ChecksumMessageFormatter()
val configs: JMap[String, String] = new HashMap()
formatter.configure(configs)
var out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("checksum:-1\n", out.toString)
configs.put("topic", "topic1")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("topic1:checksum:-1\n", out.toString)
}
} }

View File

@ -189,6 +189,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/> <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match> </Match>
<Match>
<!-- Keeping this class for compatibility. It's deprecated and will be removed in the next major release -->
<Source name="MessageFormatter.scala"/>
<Package name="kafka.common"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
</Match>
<Match> <Match>
<!-- Suppress a warning about some static initializers in Schema using instances of a <!-- Suppress a warning about some static initializers in Schema using instances of a
subclass. --> subclass. -->