diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 19a2570adfa..42456f76a2e 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.regex.{Pattern, PatternSyntaxException} +import java.util.regex.Pattern import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge @@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} */ object MirrorMaker extends Logging with KafkaMetricsGroup { - private var producer: MirrorMakerProducer = null + private[tools] var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) // Track the messages not successfully sent by mirror maker. @@ -574,7 +574,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], + // Only for testing + private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], whitelistOpt: Option[String]) extends MirrorMakerBaseConsumer { @@ -589,12 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def init() { debug("Initiating new consumer") val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) - if (whitelistOpt.isDefined) { + whitelistOpt.foreach { whitelist => try { - consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) + consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener) } catch { - case pse: PatternSyntaxException => - error("Invalid expression syntax: %s".format(whitelistOpt.get)) + case pse: RuntimeException => + error(s"Invalid expression syntax: $whitelist") throw pse } } @@ -686,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerProducer(val producerProps: Properties) { + private[tools] class MirrorMakerProducer(val producerProps: Properties) { val sync = producerProps.getProperty("producer.type", "async").equals("sync") diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala new file mode 100644 index 00000000000..465e8de0e8b --- /dev/null +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.util.Properties + +import kafka.consumer.ConsumerTimeoutException +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer} +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.junit.Test + +class MirrorMakerIntegrationTest extends KafkaServerTestHarness { + + override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) + .map(KafkaConfig.fromProps(_, new Properties())) + + @Test + def testCommaSeparatedRegex(): Unit = { + val topic = "new-topic" + val msg = "a test message" + val brokerList = TestUtils.getBrokerListStrFromServers(servers) + + val producerProps = new Properties + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put("producer.type", "sync") + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + val producer = new MirrorMakerProducer(producerProps) + MirrorMaker.producer = producer + MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) + MirrorMaker.producer.close() + + val consumerProps = new Properties + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer) + + val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelistOpt = Some("another_topic,new.*,foo")) + mirrorMakerConsumer.init() + try { + TestUtils.waitUntilTrue(() => { + try { + val data = mirrorMakerConsumer.receive() + data.topic == topic && new String(data.value) == msg + } catch { + // this exception is thrown if no record is returned within a short timeout, so safe to ignore + case _: ConsumerTimeoutException => false + } + }, "MirrorMaker consumer should read the expected message from the expected topic within the timeout") + } finally consumer.close() + } + +} diff --git a/docs/upgrade.html b/docs/upgrade.html index 06b53da482e..3e2c52e2761 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -14,19 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. --> -
InterruptException if the calling thread is interrupted.
- Please refer to the KafkaConsumer Javadoc for a more in-depth explanation of this change.For a rolling upgrade:
@@ -39,6 +29,14 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an +InterruptException if the calling thread is interrupted.
+ Please refer to the KafkaConsumer Javadoc for a more in-depth explanation of this change.