KAFKA-6016; Make the reassign partitions system test use the idempotent producer

With these changes, we are ensuring that the partitions being reassigned are from non-zero offsets. We also ensure that every message in the log has producerId and sequence number.

This means that it successfully reproduces https://issues.apache.org/jira/browse/KAFKA-6003.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #4029 from apurvam/KAFKA-6016-add-idempotent-producer-to-reassign-partitions
This commit is contained in:
Apurva Mehta 2017-10-10 10:31:52 -07:00 committed by Jason Gustafson
parent 64930cd713
commit 90b5ce3f04
6 changed files with 108 additions and 31 deletions

View File

@ -58,7 +58,8 @@ public class ProducerRecord<K, V> {
* *
* @param topic The topic the record will be appended to * @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent * @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
* the timestamp using System.currentTimeMillis().
* @param key The key that will be included in the record * @param key The key that will be included in the record
* @param value The record contents * @param value The record contents
* @param headers the headers that will be included in the record * @param headers the headers that will be included in the record
@ -85,7 +86,8 @@ public class ProducerRecord<K, V> {
* *
* @param topic The topic the record will be appended to * @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent * @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the
* timestamp using System.currentTimeMillis().
* @param key The key that will be included in the record * @param key The key that will be included in the record
* @param value The record contents * @param value The record contents
*/ */
@ -168,7 +170,7 @@ public class ProducerRecord<K, V> {
} }
/** /**
* @return The timestamp * @return The timestamp, which is in milliseconds since epoch.
*/ */
public Long timestamp() { public Long timestamp() {
return timestamp; return timestamp;

View File

@ -33,6 +33,7 @@ NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir"
LOG_RETENTION_HOURS = "log.retention.hours" LOG_RETENTION_HOURS = "log.retention.hours"
LOG_SEGMENT_BYTES = "log.segment.bytes" LOG_SEGMENT_BYTES = "log.segment.bytes"
LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms" LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms"
LOG_RETENTION_MS = "log.retention.ms"
LOG_CLEANER_ENABLE = "log.cleaner.enable" LOG_CLEANER_ENABLE = "log.cleaner.enable"
AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable" AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"

View File

@ -30,7 +30,6 @@ num.partitions=1
num.recovery.threads.per.data.dir=1 num.recovery.threads.per.data.dir=1
log.retention.hours=168 log.retention.hours=168
log.segment.bytes=1073741824 log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false log.cleaner.enable=false
security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}

View File

@ -56,7 +56,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[]): stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
enable_idempotence=False, offline_nodes=[], create_time=-1):
""" """
:param max_messages is a number of messages to be produced per producer :param max_messages is a number of messages to be produced per producer
:param message_validator checks for an expected format of messages produced. There are :param message_validator checks for an expected format of messages produced. There are
@ -91,6 +92,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.request_timeout_sec = request_timeout_sec self.request_timeout_sec = request_timeout_sec
self.enable_idempotence = enable_idempotence self.enable_idempotence = enable_idempotence
self.offline_nodes = offline_nodes self.offline_nodes = offline_nodes
self.create_time = create_time
def java_class_name(self): def java_class_name(self):
return "VerifiableProducer" return "VerifiableProducer"
@ -125,8 +127,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000) producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000)
if self.enable_idempotence: if self.enable_idempotence:
self.logger.info("Setting up an idempotent producer") self.logger.info("Setting up an idempotent producer")
producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n" producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n"
producer_prop_file += "\nretries=50\n" producer_prop_file += "\nretries=1000000\n"
producer_prop_file += "\nenable.idempotence=true\n" producer_prop_file += "\nenable.idempotence=true\n"
self.logger.info("verifiable_producer.properties:") self.logger.info("verifiable_producer.properties:")
@ -194,6 +196,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += " --value-prefix %s" % str(idx) cmd += " --value-prefix %s" % str(idx)
if self.acks is not None: if self.acks is not None:
cmd += " --acks %s " % str(self.acks) cmd += " --acks %s " % str(self.acks)
if self.create_time > -1:
cmd += " --message-create-time %s " % str(self.create_time)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)

View File

@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ducktape.mark import parametrize from ducktape.mark import matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
@ -24,7 +25,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
import random import random
import time
class ReassignPartitionsTest(ProduceConsumeValidateTest): class ReassignPartitionsTest(ProduceConsumeValidateTest):
""" """
@ -38,13 +39,25 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
super(ReassignPartitionsTest, self).__init__(test_context=test_context) super(ReassignPartitionsTest, self).__init__(test_context=test_context)
self.topic = "test_topic" self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: {
"partitions": 20,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}
})
self.num_partitions = 20 self.num_partitions = 20
self.zk = ZookeeperService(test_context, num_nodes=1)
# We set the min.insync.replicas to match the replication factor because
# it makes the test more stringent. If min.isr = 2 and
# replication.factor=3, then the test would tolerate the failure of
# reassignment for upto one replica per partition, which is not
# desirable for this test in particular.
self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk,
server_prop_overides=[
[config_property.LOG_ROLL_TIME_MS, "5000"],
[config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"]
],
topics={self.topic: {
"partitions": self.num_partitions,
"replication-factor": 3,
'configs': {
"min.insync.replicas": 3,
}}
})
self.timeout_sec = 60 self.timeout_sec = 60
self.producer_throughput = 1000 self.producer_throughput = 1000
self.num_producers = 1 self.num_producers = 1
@ -86,14 +99,42 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
self.clean_bounce_some_brokers() self.clean_bounce_some_brokers()
# Wait until finished or timeout # Wait until finished or timeout
wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5) wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info),
timeout_sec=self.timeout_sec, backoff_sec=.5)
@cluster(num_nodes=7) def move_start_offset(self):
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=True) """We move the start offset of the topic by writing really old messages
@parametrize(security_protocol="PLAINTEXT", bounce_brokers=False) and waiting for them to be cleaned up.
def test_reassign_partitions(self, bounce_brokers, security_protocol): """
producer = VerifiableProducer(self.test_context, 1, self.kafka, self.topic,
throughput=-1, enable_idempotence=True,
create_time=1000)
producer.start()
wait_until(lambda: producer.num_acked > 0,
timeout_sec=30,
err_msg="Failed to get an acknowledgement for %ds" % 30)
# Wait 8 seconds to let the topic be seeded with messages that will
# be deleted. The 8 seconds is important, since we should get 2 deleted
# segments in this period based on the configured log roll time and the
# retention check interval.
time.sleep(8)
producer.stop()
self.logger.info("Seeded topic with %d messages which will be deleted" %\
producer.num_acked)
# Since the configured check interval is 5 seconds, we wait another
# 6 seconds to ensure that at least one more cleaning so that the last
# segment is deleted. An altenate to using timeouts is to poll each
# partition untill the log start offset matches the end offset. The
# latter is more robust.
time.sleep(6)
@cluster(num_nodes=8)
@matrix(bounce_brokers=[True, False],
reassign_from_offset_zero=[True, False])
def test_reassign_partitions(self, bounce_brokers, reassign_from_offset_zero):
"""Reassign partitions tests. """Reassign partitions tests.
Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, replication-factor=3,
and min.insync.replicas=3
- Produce messages in the background - Produce messages in the background
- Consume messages in the background - Consume messages in the background
@ -101,13 +142,19 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
- If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
- When done reassigning partitions and bouncing brokers, stop producing, and finish consuming - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
- Validate that every acked message was consumed - Validate that every acked message was consumed
""" """
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start() self.kafka.start()
if not reassign_from_offset_zero:
self.move_start_offset()
self.producer = VerifiableProducer(self.test_context, self.num_producers,
self.kafka, self.topic,
throughput=self.producer_throughput,
enable_idempotence=True)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka, self.topic,
consumer_timeout_ms=60000,
message_validator=is_int)
self.enable_idempotence=True
self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers)) self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

View File

@ -80,13 +80,22 @@ public class VerifiableProducer {
// if null, then values are produced without a prefix // if null, then values are produced without a prefix
private final Integer valuePrefix; private final Integer valuePrefix;
public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, Integer valuePrefix) { // The create time to set in messages, in milliseconds since epoch
private Long createTime;
private final Long startTime;
public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages,
Integer valuePrefix, Long createTime) {
this.topic = topic; this.topic = topic;
this.throughput = throughput; this.throughput = throughput;
this.maxMessages = maxMessages; this.maxMessages = maxMessages;
this.producer = producer; this.producer = producer;
this.valuePrefix = valuePrefix; this.valuePrefix = valuePrefix;
this.createTime = createTime;
this.startTime = System.currentTimeMillis();
} }
/** Get the command-line argument parser. */ /** Get the command-line argument parser. */
@ -144,6 +153,15 @@ public class VerifiableProducer {
.metavar("CONFIG_FILE") .metavar("CONFIG_FILE")
.help("Producer config properties file."); .help("Producer config properties file.");
parser.addArgument("--message-create-time")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("CREATETIME")
.dest("createTime")
.help("Send messages with creation time starting at the arguments value, in milliseconds since epoch");
parser.addArgument("--value-prefix") parser.addArgument("--value-prefix")
.action(store()) .action(store())
.required(false) .required(false)
@ -181,6 +199,10 @@ public class VerifiableProducer {
int throughput = res.getInt("throughput"); int throughput = res.getInt("throughput");
String configFile = res.getString("producer.config"); String configFile = res.getString("producer.config");
Integer valuePrefix = res.getInt("valuePrefix"); Integer valuePrefix = res.getInt("valuePrefix");
Long createTime = (long) res.getInt("createTime");
if (createTime == -1L)
createTime = null;
Properties producerProps = new Properties(); Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
@ -202,12 +224,14 @@ public class VerifiableProducer {
StringSerializer serializer = new StringSerializer(); StringSerializer serializer = new StringSerializer();
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer);
return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix); return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime);
} }
/** Produce a message with given key and value. */ /** Produce a message with given key and value. */
public void send(String key, String value) { public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, createTime, key, value);
if (createTime != null)
createTime += System.currentTimeMillis() - startTime;
numSent++; numSent++;
try { try {
producer.send(record, new PrintInfoCallback(key, value)); producer.send(record, new PrintInfoCallback(key, value));