KAFKA-4351; MirrorMaker with new consumer should support comma-separated regex

This makes it consistent with MirrorMaker with the old consumer.

Author: huxi <huxi@zhenrongbao.com>
Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer
This commit is contained in:
huxi 2016-12-31 10:45:47 +00:00 committed by Ismael Juma
parent 3d7e88456f
commit 29d456cd21
3 changed files with 92 additions and 20 deletions

View File

@ -20,7 +20,7 @@ package kafka.tools
import java.util import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.regex.{Pattern, PatternSyntaxException} import java.util.regex.Pattern
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
*/ */
object MirrorMaker extends Logging with KafkaMetricsGroup { object MirrorMaker extends Logging with KafkaMetricsGroup {
private var producer: MirrorMakerProducer = null private[tools] var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker. // Track the messages not successfully sent by mirror maker.
@ -574,7 +574,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
} }
} }
private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], // Only for testing
private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener], customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
whitelistOpt: Option[String]) whitelistOpt: Option[String])
extends MirrorMakerBaseConsumer { extends MirrorMakerBaseConsumer {
@ -589,12 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def init() { override def init() {
debug("Initiating new consumer") debug("Initiating new consumer")
val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
if (whitelistOpt.isDefined) { whitelistOpt.foreach { whitelist =>
try { try {
consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener) consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener)
} catch { } catch {
case pse: PatternSyntaxException => case pse: RuntimeException =>
error("Invalid expression syntax: %s".format(whitelistOpt.get)) error(s"Invalid expression syntax: $whitelist")
throw pse throw pse
} }
} }
@ -686,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
} }
} }
private class MirrorMakerProducer(val producerProps: Properties) { private[tools] class MirrorMakerProducer(val producerProps: Properties) {
val sync = producerProps.getProperty("producer.type", "async").equals("sync") val sync = producerProps.getProperty("producer.type", "async").equals("sync")

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.util.Properties
import kafka.consumer.ConsumerTimeoutException
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.junit.Test
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
.map(KafkaConfig.fromProps(_, new Properties()))
@Test
def testCommaSeparatedRegex(): Unit = {
val topic = "new-topic"
val msg = "a test message"
val brokerList = TestUtils.getBrokerListStrFromServers(servers)
val producerProps = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put("producer.type", "sync")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
val producer = new MirrorMakerProducer(producerProps)
MirrorMaker.producer = producer
MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes()))
MirrorMaker.producer.close()
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelistOpt = Some("another_topic,new.*,foo"))
mirrorMakerConsumer.init()
try {
TestUtils.waitUntilTrue(() => {
try {
val data = mirrorMakerConsumer.receive()
data.topic == topic && new String(data.value) == msg
} catch {
// this exception is thrown if no record is returned within a short timeout, so safe to ignore
case _: ConsumerTimeoutException => false
}
}, "MirrorMaker consumer should read the expected message from the expected topic within the timeout")
} finally consumer.close()
}
}

View File

@ -14,19 +14,9 @@
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
--> -->
<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0</a></h4> <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0 Users upgrading from versions prior to 0.10.1.x should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
can upgrade the brokers one at a time: shut down the broker, update the code, and restart it. can upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
<br>
0.10.2.0 has <a href="#upgrade_10_2_0_breaking">Potential breaking changes</a> (Please review before upgrading).
<h5><a id="upgrade_10_2_0_breaking" href="#upgrade_10_2_0_breaking">Potential breaking changes in 0.10.2.0</a></h5>
<ul>
<li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
</ul>
<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
<p><b>For a rolling upgrade:</b></p> <p><b>For a rolling upgrade:</b></p>
@ -39,6 +29,14 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
</li> </li>
</ol> </ol>
<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5>
<ul>
<li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
<li>Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This
makes the behaviour consistent with MirrorMaker when used the old Scala consumer.</li>
</ul>
<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4> <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade. However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.