KAFKA-14705 Remove deprecated classes and options in tools (#17420)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-10-09 04:33:34 +02:00 committed by GitHub
parent 7592bc3cbe
commit 2836f7aaae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 27 additions and 303 deletions

View File

@ -17,9 +17,7 @@
package kafka.coordinator.group
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
@ -31,7 +29,6 @@ import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitErro
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{Metrics, Sensor}
@ -42,7 +39,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{MessageFormatter, TopicIdPartition, TopicPartition}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
@ -1233,51 +1230,6 @@ object GroupMetadataManager {
}
}
// Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
// (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
@Deprecated
class OffsetsMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
}
// Formatter for use with tools to read group metadata history
@Deprecated
class GroupMetadataMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is a group metadata record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case groupMetadataKey: GroupMetadataKey =>
val groupId = groupMetadataKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value), Time.SYSTEM).toString
output.write(groupId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
}
def maybeConvertOffsetCommitError(error: Errors) : Errors = {
error match {
case Errors.NETWORK_EXCEPTION =>

View File

@ -16,14 +16,11 @@
*/
package kafka.coordinator.transaction
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.record.{Record, RecordBatch}
import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.server.common.TransactionVersion
@ -147,29 +144,6 @@ object TransactionLog {
}
}
// Formatter for use with tools to read transaction log messages
@Deprecated
class TransactionLogMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
case txnKey: TxnKey =>
val transactionalId = txnKey.transactionalId
val value = consumerRecord.value
val producerIdMetadata = if (value == null)
None
else
readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case unknownKey: UnknownKey =>
output.write(s"unknown::version=${unknownKey.version}\n".getBytes(StandardCharsets.UTF_8))
}
}
}
/**
* Exposed for printing records using [[kafka.tools.DumpLogSegments]]
*/

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.serializer
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import kafka.utils.VerifiableProperties
/**
* A decoder is a method of turning byte arrays into objects.
* An implementation is required to provide a constructor that
* takes a VerifiableProperties instance.
*/
@deprecated(since = "3.8.0")
trait Decoder[T] {
def fromBytes(bytes: Array[Byte]): T
}
/**
* The default implementation does nothing, just returns the same byte array it takes in.
*/
@deprecated(since = "3.8.0")
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
/**
* The string decoder translates bytes into strings. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.
*/
@deprecated(since = "3.8.0")
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
val encoding: String =
if (props == null)
StandardCharsets.UTF_8.name()
else
props.getString("serializer.encoding", StandardCharsets.UTF_8.name())
def fromBytes(bytes: Array[Byte]): String = {
new String(bytes, encoding)
}
}
/**
* The long decoder translates bytes into longs.
*/
@deprecated(since = "3.8.0")
class LongDecoder(props: VerifiableProperties = null) extends Decoder[Long] {
def fromBytes(bytes: Array[Byte]): Long = {
ByteBuffer.wrap(bytes).getLong
}
}
/**
* The integer decoder translates bytes into integers.
*/
@deprecated(since = "3.8.0")
class IntegerDecoder(props: VerifiableProperties = null) extends Decoder[Integer] {
def fromBytes(bytes: Array[Byte]): Integer = {
ByteBuffer.wrap(bytes).getInt()
}
}

View File

@ -23,7 +23,7 @@ import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.utils.{CoreUtils, VerifiableProperties}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter
@ -46,7 +46,7 @@ import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex}
import org.apache.kafka.tools.api.{Decoder, DefaultDecoder, IntegerDecoder, LongDecoder, StringDecoder}
import org.apache.kafka.tools.api.{Decoder, StringDecoder}
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters._
@ -676,8 +676,8 @@ object DumpLogSegments {
} else if (options.has(remoteMetadataOpt)) {
new RemoteMetadataLogMessageParser
} else {
val valueDecoder = newDecoder(options.valueOf(valueDecoderOpt))
val keyDecoder = newDecoder(options.valueOf(keyDecoderOpt))
val valueDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(valueDecoderOpt))
val keyDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(keyDecoderOpt))
new DecoderMessageParser(keyDecoder, valueDecoder)
}
@ -699,42 +699,4 @@ object DumpLogSegments {
def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
}
/*
* The kafka.serializer.Decoder is deprecated in 3.8.0. This method is used to transfer the deprecated
* decoder to the new org.apache.kafka.tools.api.Decoder. Old decoders have an input VerifiableProperties.
* Remove it in new interface since it's always empty.
*/
private[tools] def newDecoder(className: String): Decoder[_] = {
try {
CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](convertDeprecatedDecoderClass(className))
} catch {
case _: Exception =>
// Old decoders always have an default VerifiableProperties input, because DumpLogSegments didn't provide
// any way to pass custom configs.
val decoder = CoreUtils.createObject[kafka.serializer.Decoder[_]](className, new VerifiableProperties())
(bytes: Array[Byte]) => decoder.fromBytes(bytes)
}
}
/*
* Covert deprecated decoder implementation to new decoder class.
*/
private[tools] def convertDeprecatedDecoderClass(className: String): String = {
if (className == "kafka.serializer.StringDecoder") {
println("kafka.serializer.StringDecoder is deprecated. Please use org.apache.kafka.tools.api.StringDecoder instead")
classOf[StringDecoder].getName
} else if (className == "kafka.serializer.LongDecoder") {
println("kafka.serializer.LongDecoder is deprecated. Please use org.apache.kafka.tools.api.LongDecoder instead")
classOf[LongDecoder].getName
} else if (className == "kafka.serializer.IntegerDecoder") {
println("kafka.serializer.IntegerDecoder is deprecated. Please use org.apache.kafka.tools.api.IntegerDecoder instead")
classOf[IntegerDecoder].getName
} else if (className == "kafka.serializer.DefaultDecoder") {
println("kafka.serializer.DefaultDecoder is deprecated. Please use org.apache.kafka.tools.api.DefaultDecoder instead")
classOf[DefaultDecoder].getName
} else {
className
}
}
}

View File

@ -28,7 +28,7 @@ import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors}
import kafka.utils.{TestUtils, VerifiableProperties}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
@ -221,9 +221,9 @@ class DumpLogSegmentsTest {
// Verify that records are printed with --print-data-log if --deep-iteration is also specified
verifyRecordsInOutput(checkKeysAndValues = true, Array("--print-data-log", "--deep-iteration", "--files", logFilePath))
// Verify that records are printed with --value-decoder even if --print-data-log is not specified
verifyRecordsInOutput(checkKeysAndValues = true, Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
verifyRecordsInOutput(checkKeysAndValues = true, Array("--value-decoder-class", "org.apache.kafka.tools.api.StringDecoder", "--files", logFilePath))
// Verify that records are printed with --key-decoder even if --print-data-log is not specified
verifyRecordsInOutput(checkKeysAndValues = true, Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFilePath))
verifyRecordsInOutput(checkKeysAndValues = true, Array("--key-decoder-class", "org.apache.kafka.tools.api.StringDecoder", "--files", logFilePath))
// Verify that records are printed with --deep-iteration even if --print-data-log is not specified
verifyRecordsInOutput(checkKeysAndValues = false, Array("--deep-iteration", "--files", logFilePath))
@ -397,7 +397,7 @@ class DumpLogSegmentsTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, time.scheduler, time)
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0)
val secondSegment = log.roll();
val secondSegment = log.roll()
secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
secondSegment.flush()
log.flush(true)
@ -829,26 +829,6 @@ class DumpLogSegmentsTest {
)
}
@Test
def testNewDecoder(): Unit = {
// Decoder translate should pass without exception
DumpLogSegments.newDecoder(classOf[DumpLogSegmentsTest.TestDecoder].getName)
DumpLogSegments.newDecoder(classOf[kafka.serializer.DefaultDecoder].getName)
assertThrows(classOf[Exception], () => DumpLogSegments.newDecoder(classOf[DumpLogSegmentsTest.TestDecoderWithoutVerifiableProperties].getName))
}
@Test
def testConvertDeprecatedDecoderClass(): Unit = {
assertEquals(classOf[org.apache.kafka.tools.api.DefaultDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass(
classOf[kafka.serializer.DefaultDecoder].getName))
assertEquals(classOf[org.apache.kafka.tools.api.IntegerDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass(
classOf[kafka.serializer.IntegerDecoder].getName))
assertEquals(classOf[org.apache.kafka.tools.api.LongDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass(
classOf[kafka.serializer.LongDecoder].getName))
assertEquals(classOf[org.apache.kafka.tools.api.StringDecoder].getName, DumpLogSegments.convertDeprecatedDecoderClass(
classOf[kafka.serializer.StringDecoder].getName))
}
private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = {
while (lines.hasNext) {
val line = lines.next()
@ -983,13 +963,3 @@ class DumpLogSegmentsTest {
}
}
}
object DumpLogSegmentsTest {
class TestDecoder(props: VerifiableProperties) extends kafka.serializer.Decoder[Array[Byte]] {
override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
class TestDecoderWithoutVerifiableProperties() extends kafka.serializer.Decoder[Array[Byte]] {
override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
}

View File

@ -73,7 +73,7 @@
</li>
<li>The <code>kafka.tools.LoggingMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.LoggingMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.tools.NoOpMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.NoOpMessageFormatter</code> class instead.
<li>The <code>kafka.tools.NoOpMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.NoOpMessageFormatter</code> class instead.
</li>
<li>The <code>--whitelist</code> option was removed from the <code>kafka-console-consumer</code> command line tool.
Please use <code>--include</code> instead.
@ -89,6 +89,20 @@
<li>The <code>--authorizer</code>, <code>--authorizer-properties</code>, and <code>--zk-tls-config-file</code> options were removed from the <code>kafka-acls</code> command line tool.
Please use <code>--bootstrap-server</code> or <code>--bootstrap-controller</code> instead.
</li>
<li>
The <code>kafka.serializer.Decoder</code> trait was removed, please use the
<a href="/{{version}}/javadoc/org/apache/kafka/tools/api/Decoder.html"><code>org.apache.kafka.tools.api.Decoder</code></a>
interface to build custom decoders for the <code>kafka-dump-log</code> tool.
</li>
<li>The <code>kafka.coordinator.group.OffsetsMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.OffsetsMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.coordinator.group.GroupMetadataMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.coordinator.transaction.TransactionLogMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.TransactionLogMessageFormatter</code> class instead.
</li>
<li>The <code>--topic-white-list</code> option was removed from the <code>kafka-replica-verification</code> command line tool.
Please use <code>--topics-include</code> instead.
</li>
</ul>
</li>
<li><b>Connect</b>

View File

@ -264,7 +264,6 @@ public class ReplicaVerificationTool {
private final OptionSpec<String> brokerListOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Integer> maxWaitMsOpt;
private final OptionSpec<String> topicWhiteListOpt;
private final OptionSpec<String> topicsIncludeOpt;
private final OptionSpec<Long> initialOffsetTimeOpt;
private final OptionSpec<Long> reportIntervalOpt;
@ -285,12 +284,6 @@ public class ReplicaVerificationTool {
.describedAs("ms")
.ofType(Integer.class)
.defaultsTo(1_000);
topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; " +
"ignored if --topics-include specified. List of topics to verify replica consistency.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class)
.defaultsTo(".*");
topicsIncludeOpt = parser.accepts("topics-include", "List of topics to verify replica consistency.")
.withRequiredArg()
.describedAs("Java regex (String)")
@ -314,8 +307,6 @@ public class ReplicaVerificationTool {
CommandLineUtils.printVersionAndExit();
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt);
CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt);
}
String brokerHostsAndPorts() {
@ -331,7 +322,7 @@ public class ReplicaVerificationTool {
}
TopicFilter.IncludeList topicsIncludeFilter() {
String regex = options.valueOf(options.has(topicsIncludeOpt) ? topicsIncludeOpt : topicWhiteListOpt);
String regex = options.valueOf(topicsIncludeOpt);
try {
Pattern.compile(regex);
} catch (PatternSyntaxException e) {

View File

@ -327,7 +327,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
private MessageFormatter buildFormatter() {
MessageFormatter formatter = null;
try {
Class<?> messageFormatterClass = Class.forName(convertDeprecatedClass(options.valueOf(messageFormatterOpt)));
Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
formatter = (MessageFormatter) messageFormatterClass.getDeclaredConstructor().newInstance();
Properties formatterArgs = formatterArgs();
@ -344,25 +344,6 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
return formatter;
}
private static String convertDeprecatedClass(String className) {
switch (className) {
case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " +
"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
return className;
case "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter":
System.err.println("WARNING: kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter is deprecated and will be removed in the next major release. " +
"Please use org.apache.kafka.tools.consumer.OffsetsMessageFormatter instead");
return className;
case "kafka.coordinator.group.GroupMetadataManager$GroupMetadataMessageFormatter":
System.err.println("WARNING: kafka.coordinator.group.GroupMetadataManager$GroupMetadataMessageFormatter is deprecated and will be removed in the next major release. " +
"Please use org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter instead");
return className;
default:
return className;
}
}
Properties consumerProps() {
return consumerProps;
}

View File

@ -611,48 +611,6 @@ public class ConsoleConsumerOptionsTest {
assertInstanceOf(NoOpMessageFormatter.class, new ConsoleConsumerOptions(noOpMessageFormatter).formatter());
}
@SuppressWarnings("deprecation")
@Test
public void testNewAndDeprecateTransactionLogMessageFormatter() throws Exception {
String[] deprecatedTransactionLogMessageFormatter =
generateArgsForFormatter("kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter");
assertInstanceOf(kafka.coordinator.transaction.TransactionLog.TransactionLogMessageFormatter.class,
new ConsoleConsumerOptions(deprecatedTransactionLogMessageFormatter).formatter());
String[] transactionLogMessageFormatter =
generateArgsForFormatter("org.apache.kafka.tools.consumer.TransactionLogMessageFormatter");
assertInstanceOf(TransactionLogMessageFormatter.class,
new ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
}
@SuppressWarnings("deprecation")
@Test
public void testNewAndDeprecateOffsetsMessageFormatter() throws Exception {
String[] deprecatedOffsetsMessageFormatter =
generateArgsForFormatter("kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter");
assertInstanceOf(kafka.coordinator.group.GroupMetadataManager.OffsetsMessageFormatter.class,
new ConsoleConsumerOptions(deprecatedOffsetsMessageFormatter).formatter());
String[] offsetsMessageFormatter =
generateArgsForFormatter("org.apache.kafka.tools.consumer.OffsetsMessageFormatter");
assertInstanceOf(OffsetsMessageFormatter.class,
new ConsoleConsumerOptions(offsetsMessageFormatter).formatter());
}
@SuppressWarnings("deprecation")
@Test
public void testNewAndDeprecateGroupMetadataMessageFormatter() throws Exception {
String[] deprecatedGroupMetadataMessageFormatter =
generateArgsForFormatter("kafka.coordinator.group.GroupMetadataManager$GroupMetadataMessageFormatter");
assertInstanceOf(kafka.coordinator.group.GroupMetadataManager.GroupMetadataMessageFormatter.class,
new ConsoleConsumerOptions(deprecatedGroupMetadataMessageFormatter).formatter());
String[] groupMetadataMessageFormatter =
generateArgsForFormatter("org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter");
assertInstanceOf(GroupMetadataMessageFormatter.class,
new ConsoleConsumerOptions(groupMetadataMessageFormatter).formatter());
}
private String[] generateArgsForFormatter(String formatter) {
return new String[]{
"--bootstrap-server", "localhost:9092",