Trogdor's ProducerBench does not fail if topics exists (#4673)

Added configs to ProducerBenchSpec:
topicPrefix: name of topics will be of format topicPrefix + topic index. If not provided, default is "produceBenchTopic".
partitionsPerTopic: number of partitions per topic. If not provided, default is 1.
replicationFactor: replication factor per topic. If not provided, default is 3.

The behavior of producer bench is changed such that if some or all topics already exist (with topic names = topicPrefix + topic index), and they have the same number of partitions as requested, the worker uses those topics and does not fail. The producer bench fails if one or more existing topics has number of partitions that is different from expected number of partitions.

Added unit test for WorkerUtils -- for existing methods and new methods.

Fixed bug in MockAdminClient, where createTopics() would over-write existing topic's replication factor and number of partitions while correctly completing the appropriate futures exceptionally with TopicExistsException.

Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Anna Povzner 2018-03-20 06:51:45 -07:00 committed by Rajini Sivaram
parent aefe35e493
commit 5c24295d44
8 changed files with 409 additions and 70 deletions

View File

@ -212,6 +212,8 @@
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
files="SignalLogger.java"/>
<suppress checks="ParameterNumber"
files="ProduceBenchSpec.java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"

View File

@ -152,6 +152,7 @@ public class MockAdminClient extends AdminClient {
if (allTopics.containsKey(topicName)) {
future.completeExceptionally(new TopicExistsException(String.format("Topic %s exists already.", topicName)));
createTopicResult.put(topicName, future);
continue;
}
int replicationFactor = newTopic.replicationFactor();
List<Node> replicas = new ArrayList<>(replicationFactor);

View File

@ -19,9 +19,14 @@ package org.apache.kafka.trogdor.common;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -33,7 +38,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
@ -71,7 +75,7 @@ public final class WorkerUtils {
}
private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
private static final int CREATE_TOPICS_CALL_TIMEOUT = 90000;
private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
//Map<String, Map<Integer, List<Integer>>> topics) throws Throwable {
@ -82,15 +86,62 @@ public final class WorkerUtils {
* @param log The logger to use.
* @param bootstrapServers The bootstrap server list.
* @param topics Maps topic names to partition assignments.
* @param failOnExisting If true, the method will throw TopicExistsException if one or
* more topics already exist. Otherwise, the existing topics are
* verified for number of partitions. In this case, if number of
* partitions of an existing topic does not match the requested
* number of partitions, the method throws RuntimeException.
*/
public static void createTopics(Logger log, String bootstrapServers,
public static void createTopics(
Logger log, String bootstrapServers,
Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
// this method wraps the call to createTopics() that takes admin client, so that we can
// unit test the functionality with MockAdminClient. The exception is caught and
// re-thrown so that admin client is closed when the method returns.
try (AdminClient adminClient = createAdminClient(bootstrapServers)) {
createTopics(log, adminClient, topics, failOnExisting);
} catch (Exception e) {
log.warn("Failed to create or verify topics {}", topics, e);
throw e;
}
}
/**
* The actual create topics functionality is separated into this method and called from the
* above method to be able to unit test with mock adminClient.
*/
static void createTopics(
Logger log, AdminClient adminClient,
Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
if (topics.isEmpty()) {
log.warn("Request to create topics has an empty topic list.");
return;
}
Collection<String> topicsExists = createTopics(log, adminClient, topics.values());
if (!topicsExists.isEmpty()) {
if (failOnExisting) {
log.warn("Topic(s) {} already exist.", topicsExists);
throw new TopicExistsException("One or more topics already exist.");
} else {
verifyTopics(log, adminClient, topicsExists, topics);
}
}
}
/**
* Creates Kafka topics and returns a list of topics that already exist
* @param log The logger to use
* @param adminClient AdminClient
* @param topics List of topics to create
* @return Collection of topics names that already exist.
* @throws Throwable if creation of one or more topics fails (except for topic exists case).
*/
private static Collection<String> createTopics(Logger log, AdminClient adminClient,
Collection<NewTopic> topics) throws Throwable {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
try (AdminClient adminClient = AdminClient.create(props)) {
long startMs = Time.SYSTEM.milliseconds();
int tries = 0;
List<String> existingTopics = new ArrayList<>();
Map<String, NewTopic> newTopics = new HashMap<>();
for (NewTopic newTopic : topics) {
@ -98,7 +149,7 @@ public final class WorkerUtils {
}
List<String> topicsToCreate = new ArrayList<>(newTopics.keySet());
while (true) {
log.info("Attemping to create {} topics (try {})...", topicsToCreate.size(), ++tries);
log.info("Attempting to create {} topics (try {})...", topicsToCreate.size(), ++tries);
Map<String, Future<Void>> creations = new HashMap<>();
while (!topicsToCreate.isEmpty()) {
List<NewTopic> newTopicsBatch = new ArrayList<>();
@ -117,10 +168,15 @@ public final class WorkerUtils {
try {
future.get();
log.debug("Successfully created {}.", topicName);
} catch (ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
log.warn("Timed out attempting to create {}: {}", topicName, e.getCause().getMessage());
} catch (Exception e) {
if ((e.getCause() instanceof TimeoutException)
|| (e.getCause() instanceof NotEnoughReplicasException)) {
log.warn("Attempt to create topic `{}` failed: {}", topicName,
e.getCause().getMessage());
topicsToCreate.add(topicName);
} else if (e.getCause() instanceof TopicExistsException) {
log.info("Topic {} already exists.", topicName);
existingTopics.add(topicName);
} else {
log.warn("Failed to create {}", topicName, e.getCause());
throw e.getCause();
@ -137,6 +193,44 @@ public final class WorkerUtils {
throw new TimeoutException(str);
}
}
return existingTopics;
}
/**
* Verifies that topics in 'topicsToVerify' list have the same number of partitions as
* described in 'topicsInfo'
* @param log The logger to use
* @param adminClient AdminClient
* @param topicsToVerify List of topics to verify
* @param topicsInfo Map of topic name to topic description, which includes topics in
* 'topicsToVerify' list.
* @throws RuntimeException If one or more topics have different number of partitions than
* described in 'topicsInfo'
*/
private static void verifyTopics(
Logger log, AdminClient adminClient,
Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
DescribeTopicsResult topicsResult = adminClient.describeTopics(
topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
// map will always contain the topic since all topics in 'topicsExists' are in given
// 'topics' map
int partitions = topicsInfo.get(desc.name()).numPartitions();
if (desc.partitions().size() != partitions) {
String str = "Topic '" + desc.name() + "' exists, but has "
+ desc.partitions().size() + " partitions, while requested "
+ " number of partitions is " + partitions;
log.warn(str);
throw new RuntimeException(str);
}
}
}
private static AdminClient createAdminClient(String bootstrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
return AdminClient.create(props);
}
}

View File

@ -33,6 +33,11 @@ import java.util.Set;
* The specification for a benchmark that produces messages to a set of topics.
*/
public class ProduceBenchSpec extends TaskSpec {
private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
private static final int DEFAULT_NUM_PARTITIONS = 1;
private static final short DEFAULT_REPLICATION_FACTOR = 3;
private final String producerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
@ -42,6 +47,9 @@ public class ProduceBenchSpec extends TaskSpec {
private final Map<String, String> producerConf;
private final int totalTopics;
private final int activeTopics;
private final String topicPrefix;
private final int numPartitions;
private final short replicationFactor;
@JsonCreator
public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@ -54,7 +62,10 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("totalTopics") int totalTopics,
@JsonProperty("activeTopics") int activeTopics) {
@JsonProperty("activeTopics") int activeTopics,
@JsonProperty("topicPrefix") String topicPrefix,
@JsonProperty("partitionsPerTopic") int partitionsPerTopic,
@JsonProperty("replicationFactor") short replicationFactor) {
super(startMs, durationMs);
this.producerNode = (producerNode == null) ? "" : producerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@ -67,6 +78,11 @@ public class ProduceBenchSpec extends TaskSpec {
this.producerConf = (producerConf == null) ? new TreeMap<String, String>() : producerConf;
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix;
this.numPartitions = (partitionsPerTopic == 0)
? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
this.replicationFactor = (replicationFactor == 0)
? DEFAULT_REPLICATION_FACTOR : replicationFactor;
}
@JsonProperty
@ -114,6 +130,21 @@ public class ProduceBenchSpec extends TaskSpec {
return activeTopics;
}
@JsonProperty
public String topicPrefix() {
return topicPrefix;
}
@JsonProperty
public int numPartitions() {
return numPartitions;
}
@JsonProperty
public short replicationFactor() {
return replicationFactor;
}
@Override
public TaskController newController(String id) {
return new TaskController() {

View File

@ -37,8 +37,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@ -52,10 +51,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class ProduceBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
private static final short NUM_PARTITIONS = 1;
private static final short REPLICATION_FACTOR = 3;
private static final int THROTTLE_PERIOD_MS = 100;
private final String id;
@ -76,8 +71,8 @@ public class ProduceBenchWorker implements TaskWorker {
* @param topicIndex The topic number.
* @return The topic name.
*/
public static String topicIndexToName(int topicIndex) {
return String.format("topic%05d", topicIndex);
public String topicIndexToName(int topicIndex) {
return String.format("%s%05d", spec.topicPrefix(), topicIndex);
}
public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
@ -111,11 +106,13 @@ public class ProduceBenchWorker implements TaskWorker {
"activeTopics was %d, but totalTopics was only %d. activeTopics must " +
"be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
}
List<NewTopic> newTopics = new ArrayList<>();
Map<String, NewTopic> newTopics = new HashMap<>();
for (int i = 0; i < spec.totalTopics(); i++) {
newTopics.add(new NewTopic(topicIndexToName(i), NUM_PARTITIONS, REPLICATION_FACTOR));
String name = topicIndexToName(i);
newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor()));
}
WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics);
WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics, false);
executor.submit(new SendRecords());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);

View File

@ -123,8 +123,11 @@ public class RoundTripWorker implements TaskWorker {
if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) {
throw new ConfigException("Invalid null or empty partitionAssignments.");
}
WorkerUtils.createTopics(log, spec.bootstrapServers(),
Collections.singletonList(new NewTopic(TOPIC_NAME, spec.partitionAssignments())));
WorkerUtils.createTopics(
log, spec.bootstrapServers(),
Collections.singletonMap(TOPIC_NAME,
new NewTopic(TOPIC_NAME, spec.partitionAssignments())),
true);
executor.submit(new ProducerRunnable());
executor.submit(new ConsumerRunnable());
} catch (Throwable e) {

View File

@ -49,7 +49,7 @@ public class JsonSerializationTest {
verify(new WorkerRunning(null, 0, null));
verify(new WorkerStopping(null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
0, 0, null, null, null, 0, 0));
0, 0, null, null, null, 0, 0, "test-topic", 1, (short) 3));
verify(new RoundTripWorkloadSpec(0, 0, null, null,
0, null, null, 0));
verify(new SampleTaskSpec(0, 0, 0, null));

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Node;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WorkerUtilsTest {
private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class);
private final Node broker1 = new Node(0, "testHost-1", 1234);
private final Node broker2 = new Node(1, "testHost-2", 1234);
private final Node broker3 = new Node(1, "testHost-3", 1234);
private final List<Node> cluster = Arrays.asList(broker1, broker2, broker3);
private final List<Node> singleReplica = Collections.singletonList(broker1);
private static final String TEST_TOPIC = "test-topic-1";
private static final short TEST_REPLICATION_FACTOR = 1;
private static final int TEST_PARTITIONS = 1;
private static final NewTopic NEW_TEST_TOPIC =
new NewTopic(TEST_TOPIC, TEST_PARTITIONS, TEST_REPLICATION_FACTOR);
private MockAdminClient adminClient;
@Before
public void setUp() throws Exception {
adminClient = new MockAdminClient(cluster, broker1);
}
@Test
public void testCreateOneTopic() throws Throwable {
Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
WorkerUtils.createTopics(log, adminClient, newTopics, true);
assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
assertEquals(
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
}
@Test
public void testCreateRetriesOnTimeout() throws Throwable {
adminClient.timeoutNextRequest(1);
WorkerUtils.createTopics(
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), true);
assertEquals(
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
}
@Test
public void testCreateZeroTopicsDoesNothing() throws Throwable {
WorkerUtils.createTopics(log, adminClient, Collections.<String, NewTopic>emptyMap(), true);
assertEquals(0, adminClient.listTopics().names().get().size());
}
@Test(expected = TopicExistsException.class)
public void testCreateTopicsFailsIfAtLeastOneTopicExists() throws Throwable {
adminClient.addTopic(
false,
TEST_TOPIC,
Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
null);
Map<String, NewTopic> newTopics = new HashMap<>();
newTopics.put(TEST_TOPIC, NEW_TEST_TOPIC);
newTopics.put("another-topic",
new NewTopic("another-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
newTopics.put("one-more-topic",
new NewTopic("one-more-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
WorkerUtils.createTopics(log, adminClient, newTopics, true);
}
@Test(expected = RuntimeException.class)
public void testExistingTopicsMustHaveRequestedNumberOfPartitions() throws Throwable {
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
adminClient.addTopic(
false,
TEST_TOPIC,
tpInfo,
null);
WorkerUtils.createTopics(
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
}
@Test
public void testExistingTopicsNotCreated() throws Throwable {
final String existingTopic = "existing-topic";
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, Collections.<Node>emptyList()));
adminClient.addTopic(
false,
existingTopic,
tpInfo,
null);
WorkerUtils.createTopics(
log, adminClient,
Collections.singletonMap(
existingTopic,
new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)), false);
assertEquals(Collections.singleton(existingTopic), adminClient.listTopics().names().get());
}
@Test
public void testCreatesNotExistingTopics() throws Throwable {
// should be no topics before the call
assertEquals(0, adminClient.listTopics().names().get().size());
WorkerUtils.createTopics(
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
assertEquals(
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
}
@Test
public void testCreatesOneTopicVerifiesOneTopic() throws Throwable {
final String existingTopic = "existing-topic";
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
adminClient.addTopic(
false,
existingTopic,
tpInfo,
null);
Map<String, NewTopic> topics = new HashMap<>();
topics.put(existingTopic,
new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR));
topics.put(TEST_TOPIC, NEW_TEST_TOPIC);
WorkerUtils.createTopics(log, adminClient, topics, false);
assertEquals(Utils.mkSet(existingTopic, TEST_TOPIC), adminClient.listTopics().names().get());
}
@Test
public void testCreateNonExistingTopicsWithZeroTopicsDoesNothing() throws Throwable {
WorkerUtils.createTopics(
log, adminClient, Collections.<String, NewTopic>emptyMap(), false);
assertEquals(0, adminClient.listTopics().names().get().size());
}
}