mirror of https://github.com/apache/kafka.git
MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)
Enable deep-iteration option when print-data-log is enabled in DumpLogSegments. Otherwise data is not printed. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
13caded15e
commit
b20639db44
|
@ -52,7 +52,7 @@ object DumpLogSegments {
|
|||
.describedAs("size")
|
||||
.ofType(classOf[java.lang.Integer])
|
||||
.defaultsTo(5 * 1024 * 1024)
|
||||
val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration.")
|
||||
val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
|
||||
val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
|
||||
.withOptionalArg()
|
||||
.ofType(classOf[java.lang.String])
|
||||
|
@ -85,7 +85,7 @@ object DumpLogSegments {
|
|||
|
||||
val files = options.valueOf(filesOpt).split(",")
|
||||
val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
|
||||
val isDeepIteration = options.has(deepIterationOpt)
|
||||
val isDeepIteration = options.has(deepIterationOpt) || printDataLog
|
||||
|
||||
val messageParser = if (options.has(offsetsOpt)) {
|
||||
new OffsetsMessageParser
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.tools
|
||||
|
||||
import java.io.ByteArrayOutputStream
|
||||
|
||||
import kafka.log.{ Log, LogConfig, LogManager }
|
||||
import kafka.server.{ BrokerTopicStats, LogDirFailureChannel }
|
||||
import kafka.utils.{ MockTime, TestUtils }
|
||||
import org.apache.kafka.common.record.{ CompressionType, MemoryRecords, SimpleRecord }
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.junit.Assert._
|
||||
import org.junit.{ After, Before, Test }
|
||||
|
||||
class DumpLogSegmentsTest {
|
||||
|
||||
val tmpDir = TestUtils.tempDir()
|
||||
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
|
||||
val logFile = s"$logDir/00000000000000000000.log"
|
||||
val time = new MockTime(0, 0)
|
||||
|
||||
@Before
|
||||
def setUp(): Unit = {
|
||||
val log = Log(logDir, LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
|
||||
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
|
||||
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
|
||||
logDirFailureChannel = new LogDirFailureChannel(10))
|
||||
|
||||
/* append two messages */
|
||||
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
|
||||
new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
|
||||
log.flush()
|
||||
}
|
||||
|
||||
@After
|
||||
def tearDown(): Unit = {
|
||||
Utils.delete(tmpDir)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPrintDataLog(): Unit = {
|
||||
|
||||
def verifyRecordsInOutput(args: Array[String]): Unit = {
|
||||
val output = runDumpLogSegments(args)
|
||||
val lines = output.split("\n")
|
||||
assertTrue(s"Data not printed: $output", lines.length > 2)
|
||||
// Verify that the last two lines are message records
|
||||
(0 until 2).foreach { i =>
|
||||
val line = lines(lines.length - 2 + i)
|
||||
assertTrue(s"Not a valid message record: $line", line.startsWith(s"offset: $i position:"))
|
||||
}
|
||||
}
|
||||
|
||||
def verifyNoRecordsInOutput(args: Array[String]): Unit = {
|
||||
val output = runDumpLogSegments(args)
|
||||
assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* position.*"))
|
||||
}
|
||||
|
||||
// Verify that records are printed with --print-data-log even if --deep-iteration is not specified
|
||||
verifyRecordsInOutput(Array("--print-data-log", "--files", logFile))
|
||||
// Verify that records are printed with --print-data-log if --deep-iteration is also specified
|
||||
verifyRecordsInOutput(Array("--print-data-log", "--deep-iteration", "--files", logFile))
|
||||
// Verify that records are printed with --value-decoder even if --print-data-log is not specified
|
||||
verifyRecordsInOutput(Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
|
||||
// Verify that records are printed with --key-decoder even if --print-data-log is not specified
|
||||
verifyRecordsInOutput(Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
|
||||
// Verify that records are printed with --deep-iteration even if --print-data-log is not specified
|
||||
verifyRecordsInOutput(Array("--deep-iteration", "--files", logFile))
|
||||
|
||||
// Verify that records are not printed by default
|
||||
verifyNoRecordsInOutput(Array("--files", logFile))
|
||||
}
|
||||
|
||||
private def runDumpLogSegments(args: Array[String]): String = {
|
||||
val outContent = new ByteArrayOutputStream
|
||||
Console.withOut(outContent) {
|
||||
DumpLogSegments.main(args)
|
||||
}
|
||||
outContent.toString
|
||||
}
|
||||
}
|
|
@ -75,6 +75,8 @@
|
|||
fine-grained timeouts (instead of hard coded retries as in older version).</li>
|
||||
<li>Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.</li>
|
||||
<li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
|
||||
<li>kafka.tools.DumpLogSegments now automatically sets deep-iteration option if print-data-log is enabled
|
||||
explicitly or implicitly due to any of the other options like decoder.</li>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>
|
||||
|
|
Loading…
Reference in New Issue