From 914ffa9dbef3c8ad6851b380276a1cb7c5aa4a0d Mon Sep 17 00:00:00 2001 From: Manikumar Reddy O Date: Mon, 20 Aug 2018 16:16:57 +0530 Subject: [PATCH] KAFKA-7210: Add system test to verify log compaction (#5226) * Updated TestLogCleaning tool to use Java consumer and rename as LogCompactionTester. * Enabled the log cleaner in every system test. * Removed configs from "kafka.properties" with default values and `socket.receive.buffer.bytes` as the override did not seem necessary. * Updated `kafka.py` logic to handle duplicates between `kafka.properties` and `server_prop_overrides`. * Updated Gradle build so that classes from `kafka-clients` test jar can be used in system tests. Reviewers: Colin Patrick McCabe , Ismael Juma --- build.gradle | 3 + .../kafka/tools/LogCompactionTester.scala | 348 ++++++++++++++++++ tests/kafkatest/services/kafka/kafka.py | 38 +- .../services/kafka/templates/kafka.properties | 12 - .../services/log_compaction_tester.py | 88 +++++ .../tests/tools/log_compaction_test.py | 66 ++++ 6 files changed, 533 insertions(+), 22 deletions(-) create mode 100755 core/src/test/scala/kafka/tools/LogCompactionTester.scala create mode 100644 tests/kafkatest/services/log_compaction_tester.py create mode 100644 tests/kafkatest/tests/tools/log_compaction_test.py diff --git a/build.gradle b/build.gradle index 0892ed19402..c1387d4ef60 100644 --- a/build.gradle +++ b/build.gradle @@ -775,6 +775,9 @@ project(':core') { include('*.jar') } into "$buildDir/dependant-testlibs" + //By default gradle does not handle test dependencies between the sub-projects + //This line is to include clients project test jar to dependant-testlibs + from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" } duplicatesStrategy 'exclude' } diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala new file mode 100755 index 00000000000..9f53f664b54 --- /dev/null +++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io._ +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Path} +import java.time.Duration +import java.util.{Properties, Random} + +import joptsimple.OptionParser +import kafka.utils._ +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.{CommonClientConfigs, admin} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer} +import org.apache.kafka.common.utils.Utils + +import scala.collection.JavaConverters._ + +/** + * This is a torture test that runs against an existing broker + * + * Here is how it works: + * + * It produces a series of specially formatted messages to one or more partitions. Each message it produces + * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space. + * + * The broker will clean its log as the test runs. + * + * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic + * and write that out to another text file. + * + * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. + * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we + * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0. + */ +object LogCompactionTester { + + //maximum line size while reading produced/consumed record text file + private val ReadAheadLimit = 4906 + + def main(args: Array[String]) { + val parser = new OptionParser(false) + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val messageCompressionOpt = parser.accepts("compression-type", "message compression type") + .withOptionalArg + .describedAs("compressionType") + .ofType(classOf[java.lang.String]) + .defaultsTo("none") + val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5) + val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to connect to.") + .withRequiredArg + .describedAs("url") + .ofType(classOf[String]) + val topicsOpt = parser.accepts("topics", "The number of topics to test.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.") + .withRequiredArg + .describedAs("percent") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep between production and consumption.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + + val options = parser.parse(args: _*) + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "A tool to test log compaction. Valid options are: ") + + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, numMessagesOpt) + + // parse options + val messages = options.valueOf(numMessagesOpt).longValue + val compressionType = options.valueOf(messageCompressionOpt) + val percentDeletes = options.valueOf(percentDeletesOpt).intValue + val dups = options.valueOf(numDupsOpt).intValue + val brokerUrl = options.valueOf(brokerOpt) + val topicCount = options.valueOf(topicsOpt).intValue + val sleepSecs = options.valueOf(sleepSecsOpt).intValue + + val testId = new Random().nextLong + val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray + createTopics(brokerUrl, topics.toSeq) + + println(s"Producing $messages messages..to topics ${topics.mkString(",")}") + val producedDataFilePath = produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes) + println(s"Sleeping for $sleepSecs seconds...") + Thread.sleep(sleepSecs * 1000) + println("Consuming messages...") + val consumedDataFilePath = consumeMessages(brokerUrl, topics) + + val producedLines = lineCount(producedDataFilePath) + val consumedLines = lineCount(consumedDataFilePath) + val reduction = 100 * (1.0 - consumedLines.toDouble / producedLines.toDouble) + println(f"$producedLines%d rows of data produced, $consumedLines%d rows of data consumed ($reduction%.1f%% reduction).") + + println("De-duplicating and validating output files...") + validateOutput(producedDataFilePath.toFile, consumedDataFilePath.toFile) + Utils.delete(producedDataFilePath.toFile) + Utils.delete(consumedDataFilePath.toFile) + //if you change this line, we need to update test_log_compaction_tool.py system test + println("Data verification is completed") + } + + def createTopics(brokerUrl: String, topics: Seq[String]): Unit = { + val adminConfig = new Properties + adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + val adminClient = admin.AdminClient.create(adminConfig) + + try { + val topicConfigs = Map(TopicConfig.CLEANUP_POLICY_CONFIG -> TopicConfig.CLEANUP_POLICY_COMPACT) + val newTopics = topics.map(name => new NewTopic(name, 1, 1).configs(topicConfigs.asJava)).asJava + adminClient.createTopics(newTopics).all.get + + var pendingTopics: Seq[String] = Seq() + TestUtils.waitUntilTrue(() => { + val allTopics = adminClient.listTopics.names.get.asScala.toSeq + pendingTopics = topics.filter(topicName => !allTopics.contains(topicName)) + pendingTopics.isEmpty + }, s"timed out waiting for topics : $pendingTopics") + + } finally adminClient.close() + } + + def lineCount(filPath: Path): Int = Files.readAllLines(filPath).size + + def validateOutput(producedDataFile: File, consumedDataFile: File) { + val producedReader = externalSort(producedDataFile) + val consumedReader = externalSort(consumedDataFile) + val produced = valuesIterator(producedReader) + val consumed = valuesIterator(consumedReader) + + val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped") + val producedDeduped : BufferedWriter = Files.newBufferedWriter(producedDedupedFile.toPath, UTF_8) + + val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped") + val consumedDeduped : BufferedWriter = Files.newBufferedWriter(consumedDedupedFile.toPath, UTF_8) + var total = 0 + var mismatched = 0 + while (produced.hasNext && consumed.hasNext) { + val p = produced.next() + producedDeduped.write(p.toString) + producedDeduped.newLine() + val c = consumed.next() + consumedDeduped.write(c.toString) + consumedDeduped.newLine() + if (p != c) + mismatched += 1 + total += 1 + } + producedDeduped.close() + consumedDeduped.close() + println(s"Validated $total values, $mismatched mismatches.") + require(!produced.hasNext, "Additional values produced not found in consumer log.") + require(!consumed.hasNext, "Additional values consumed not found in producer log.") + require(mismatched == 0, "Non-zero number of row mismatches.") + // if all the checks worked out we can delete the deduped files + Utils.delete(producedDedupedFile) + Utils.delete(consumedDedupedFile) + } + + def require(requirement: Boolean, message: => Any) { + if (!requirement) { + System.err.println(s"Data validation failed : $message") + Exit.exit(1) + } + } + + def valuesIterator(reader: BufferedReader) = { + new IteratorTemplate[TestRecord] { + def makeNext(): TestRecord = { + var next = readNext(reader) + while (next != null && next.delete) + next = readNext(reader) + if (next == null) + allDone() + else + next + } + } + } + + def readNext(reader: BufferedReader): TestRecord = { + var line = reader.readLine() + if (line == null) + return null + var curr = TestRecord.parse(line) + while (true) { + line = peekLine(reader) + if (line == null) + return curr + val next = TestRecord.parse(line) + if (next == null || next.topicAndKey != curr.topicAndKey) + return curr + curr = next + reader.readLine() + } + null + } + + def peekLine(reader: BufferedReader) = { + reader.mark(ReadAheadLimit) + val line = reader.readLine + reader.reset() + line + } + + def externalSort(file: File): BufferedReader = { + val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + Files.createTempDirectory("log_compaction_test"), file.getAbsolutePath) + val process = builder.start + new Thread() { + override def run() { + val exitCode = process.waitFor() + if (exitCode != 0) { + System.err.println("Process exited abnormally.") + while (process.getErrorStream.available > 0) { + System.err.write(process.getErrorStream().read()) + } + } + } + }.start() + new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8), 10 * 1024 * 1024) + } + + def produceMessages(brokerUrl: String, + topics: Array[String], + messages: Long, + compressionType: String, + dups: Int, + percentDeletes: Int): Path = { + val producerProps = new Properties + producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) + producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) + val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer) + try { + val rand = new Random(1) + val keyCount = (messages / dups).toInt + val producedFilePath = Files.createTempFile("kafka-log-cleaner-produced-", ".txt") + println(s"Logging produce requests to $producedFilePath") + val producedWriter: BufferedWriter = Files.newBufferedWriter(producedFilePath, UTF_8) + for (i <- 0L until (messages * topics.length)) { + val topic = topics((i % topics.length).toInt) + val key = rand.nextInt(keyCount) + val delete = (i % 100) < percentDeletes + val msg = + if (delete) + new ProducerRecord[Array[Byte], Array[Byte]](topic, key.toString.getBytes(UTF_8), null) + else + new ProducerRecord(topic, key.toString.getBytes(UTF_8), i.toString.getBytes(UTF_8)) + producer.send(msg) + producedWriter.write(TestRecord(topic, key, i, delete).toString) + producedWriter.newLine() + } + producedWriter.close() + producedFilePath + } finally { + producer.close() + } + } + + def createConsumer(brokerUrl: String): KafkaConsumer[String, String] = { + val consumerProps = new Properties + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + new KafkaConsumer(consumerProps, new StringDeserializer, new StringDeserializer) + } + + def consumeMessages(brokerUrl: String, topics: Array[String]): Path = { + val consumer = createConsumer(brokerUrl) + consumer.subscribe(topics.seq.asJava) + val consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt") + println(s"Logging consumed messages to $consumedFilePath") + val consumedWriter: BufferedWriter = Files.newBufferedWriter(consumedFilePath, UTF_8) + + try { + var done = false + while (!done) { + val consumerRecords = consumer.poll(Duration.ofSeconds(20)) + if (!consumerRecords.isEmpty) { + for (record <- consumerRecords.asScala) { + val delete = record.value == null + val value = if (delete) -1L else record.value.toLong + consumedWriter.write(TestRecord(record.topic, record.key.toInt, value, delete).toString) + consumedWriter.newLine + } + } else { + done = true + } + } + consumedFilePath + } finally { + consumedWriter.close() + consumer.close() + } + } + + def readString(buffer: ByteBuffer): String = { + Utils.utf8(buffer) + } + +} + +case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) { + override def toString = topic + "\t" + key + "\t" + value + "\t" + (if (delete) "d" else "u") + def topicAndKey = topic + key +} + +object TestRecord { + def parse(line: String): TestRecord = { + val components = line.split("\t") + new TestRecord(components(0), components(1).toInt, components(2).toLong, components(3) == "d") + } +} \ No newline at end of file diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 8eee575f4a6..b0a9faacf2e 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -211,21 +211,39 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.advertised_listeners = ','.join(advertised_listeners) def prop_file(self, node): - cfg = KafkaConfig(**node.config) - cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname - cfg[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting() - - for prop in self.server_prop_overides: - cfg[prop[0]] = prop[1] - self.set_protocol_and_port(node) - # TODO - clean up duplicate configuration logic - prop_file = cfg.render() - prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), + #load template configs as dictionary + config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node), security_config=self.security_config, num_nodes=self.num_nodes) + + configs = dict( l.rstrip().split('=') for l in config_template.split('\n') + if not l.startswith("#") and "=" in l ) + + #load specific test override configs + override_configs = KafkaConfig(**node.config) + override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname + override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting() + + for prop in self.server_prop_overides: + override_configs[prop[0]] = prop[1] + + #update template configs with test override configs + configs.update(override_configs) + + prop_file = self.render_configs(configs) return prop_file + def render_configs(self, configs): + """Render self as a series of lines key=val\n, and do so in a consistent order. """ + keys = [k for k in configs.keys()] + keys.sort() + + s = "" + for k in keys: + s += "%s=%s\n" % (k, str(configs[k])) + return s + def start_cmd(self, node): cmd = "export JMX_PORT=%d; " % self.jmx_port cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 8cca14fa66d..dd777f9be23 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -20,18 +20,6 @@ advertised.host.name={{ node.account.hostname }} listeners={{ listeners }} advertised.listeners={{ advertised_listeners }} -num.network.threads=3 -num.io.threads=8 -socket.send.buffer.bytes=102400 -socket.receive.buffer.bytes=65536 -socket.request.max.bytes=104857600 - -num.partitions=1 -num.recovery.threads.per.data.dir=1 -log.retention.hours=168 -log.segment.bytes=1073741824 -log.cleaner.enable=false - security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} ssl.keystore.location=/mnt/security/test.keystore.jks diff --git a/tests/kafkatest/services/log_compaction_tester.py b/tests/kafkatest/services/log_compaction_tester.py new file mode 100644 index 00000000000..4a19650ff2e --- /dev/null +++ b/tests/kafkatest/services/log_compaction_tester.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_LIBS_JAR_NAME, CORE_DEPENDANT_TEST_LIBS_JAR_NAME +from kafkatest.services.security.security_config import SecurityConfig +from kafkatest.version import DEV_BRANCH + +class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService): + + OUTPUT_DIR = "/mnt/logcompaction_tester" + LOG_PATH = os.path.join(OUTPUT_DIR, "logcompaction_tester_stdout.log") + VERIFICATION_STRING = "Data verification is completed" + + logs = { + "tool_logs": { + "path": LOG_PATH, + "collect_default": True} + } + + def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30): + super(LogCompactionTester, self).__init__(context, 1) + + self.kafka = kafka + self.security_protocol = security_protocol + self.security_config = SecurityConfig(self.context, security_protocol) + self.stop_timeout_sec = stop_timeout_sec + self.log_compaction_completed = False + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % LogCompactionTester.OUTPUT_DIR) + cmd = self.start_cmd(node) + self.logger.info("LogCompactionTester %d command: %s" % (idx, cmd)) + self.security_config.setup_node(node) + for line in node.account.ssh_capture(cmd): + self.logger.debug("Checking line:{}".format(line)) + + if line.startswith(LogCompactionTester.VERIFICATION_STRING): + self.log_compaction_completed = True + + def start_cmd(self, node): + core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, DEV_BRANCH) + core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH) + + cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar + cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar + cmd += " export CLASSPATH;" + cmd += self.path.script("kafka-run-class.sh", node) + cmd += " %s" % self.java_class_name() + cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes 10" % (self.kafka.bootstrap_servers(self.security_protocol)) + + cmd += " 2>> %s | tee -a %s &" % (self.logs["tool_logs"]["path"], self.logs["tool_logs"]["path"]) + return cmd + + def stop_node(self, node): + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True, + allow_fail=True) + + stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) + assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ + (str(node.account), str(self.stop_timeout_sec)) + + def clean_node(self, node): + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, + allow_fail=True) + node.account.ssh("rm -rf %s" % LogCompactionTester.OUTPUT_DIR, allow_fail=False) + + def java_class_name(self): + return "kafka.tools.LogCompactionTester" + + @property + def is_done(self): + return self.log_compaction_completed diff --git a/tests/kafkatest/tests/tools/log_compaction_test.py b/tests/kafkatest/tests/tools/log_compaction_test.py new file mode 100644 index 00000000000..338060f7217 --- /dev/null +++ b/tests/kafkatest/tests/tools/log_compaction_test.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from ducktape.mark.resource import cluster + +from kafkatest.services.kafka import config_property +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.log_compaction_tester import LogCompactionTester + +class LogCompactionTest(Test): + + # Configure smaller segment size to create more segments for compaction + LOG_SEGMENT_BYTES = "1024000" + + def __init__(self, test_context): + super(LogCompactionTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + + self.zk = ZookeeperService(test_context, self.num_zk) + self.kafka = None + self.compaction_verifier = None + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, + num_nodes = self.num_brokers, + zk = self.zk, + security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, + server_prop_overides=[ + [config_property.LOG_SEGMENT_BYTES, LogCompactionTest.LOG_SEGMENT_BYTES], + ]) + self.kafka.start() + + def start_test_log_compaction_tool(self, security_protocol): + self.compaction_verifier = LogCompactionTester(self.test_context, self.kafka, security_protocol=security_protocol) + self.compaction_verifier.start() + + @cluster(num_nodes=4) + def test_log_compaction(self, security_protocol='PLAINTEXT'): + + self.start_kafka(security_protocol, security_protocol) + self.start_test_log_compaction_tool(security_protocol) + + # Verify that compacted data verification completed in LogCompactionTester + wait_until(lambda: self.compaction_verifier.is_done, timeout_sec=180, err_msg="Timed out waiting to complete compaction")