diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 62fe4ed045b..e3bf15102bd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -212,6 +212,8 @@
files="SignalLogger.java"/>
+
replicas = new ArrayList<>(replicationFactor);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 58f8278a57f..99c13c09feb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -19,9 +19,14 @@ package org.apache.kafka.trogdor.common;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -33,7 +38,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
@@ -71,7 +75,7 @@ public final class WorkerUtils {
}
private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
- private static final int CREATE_TOPICS_CALL_TIMEOUT = 90000;
+ private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
//Map>> topics) throws Throwable {
@@ -82,61 +86,151 @@ public final class WorkerUtils {
* @param log The logger to use.
* @param bootstrapServers The bootstrap server list.
* @param topics Maps topic names to partition assignments.
+ * @param failOnExisting If true, the method will throw TopicExistsException if one or
+ * more topics already exist. Otherwise, the existing topics are
+ * verified for number of partitions. In this case, if number of
+ * partitions of an existing topic does not match the requested
+ * number of partitions, the method throws RuntimeException.
*/
- public static void createTopics(Logger log, String bootstrapServers,
- Collection topics) throws Throwable {
- Properties props = new Properties();
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
- try (AdminClient adminClient = AdminClient.create(props)) {
- long startMs = Time.SYSTEM.milliseconds();
- int tries = 0;
+ public static void createTopics(
+ Logger log, String bootstrapServers,
+ Map topics, boolean failOnExisting) throws Throwable {
+ // this method wraps the call to createTopics() that takes admin client, so that we can
+ // unit test the functionality with MockAdminClient. The exception is caught and
+ // re-thrown so that admin client is closed when the method returns.
+ try (AdminClient adminClient = createAdminClient(bootstrapServers)) {
+ createTopics(log, adminClient, topics, failOnExisting);
+ } catch (Exception e) {
+ log.warn("Failed to create or verify topics {}", topics, e);
+ throw e;
+ }
+ }
- Map newTopics = new HashMap<>();
- for (NewTopic newTopic : topics) {
- newTopics.put(newTopic.name(), newTopic);
- }
- List topicsToCreate = new ArrayList<>(newTopics.keySet());
- while (true) {
- log.info("Attemping to create {} topics (try {})...", topicsToCreate.size(), ++tries);
- Map> creations = new HashMap<>();
- while (!topicsToCreate.isEmpty()) {
- List newTopicsBatch = new ArrayList<>();
- for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
- !topicsToCreate.isEmpty(); i++) {
- String topicName = topicsToCreate.remove(0);
- newTopicsBatch.add(newTopics.get(topicName));
- }
- creations.putAll(adminClient.createTopics(newTopicsBatch).values());
- }
- // We retry cases where the topic creation failed with a
- // timeout. This is a workaround for KAFKA-6368.
- for (Map.Entry> entry : creations.entrySet()) {
- String topicName = entry.getKey();
- Future future = entry.getValue();
- try {
- future.get();
- log.debug("Successfully created {}.", topicName);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof TimeoutException) {
- log.warn("Timed out attempting to create {}: {}", topicName, e.getCause().getMessage());
- topicsToCreate.add(topicName);
- } else {
- log.warn("Failed to create {}", topicName, e.getCause());
- throw e.getCause();
- }
- }
- }
- if (topicsToCreate.isEmpty()) {
- break;
- }
- if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
- String str = "Unable to create topic(s): " +
- Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
- log.warn(str);
- throw new TimeoutException(str);
- }
+ /**
+ * The actual create topics functionality is separated into this method and called from the
+ * above method to be able to unit test with mock adminClient.
+ */
+ static void createTopics(
+ Logger log, AdminClient adminClient,
+ Map topics, boolean failOnExisting) throws Throwable {
+ if (topics.isEmpty()) {
+ log.warn("Request to create topics has an empty topic list.");
+ return;
+ }
+
+ Collection topicsExists = createTopics(log, adminClient, topics.values());
+ if (!topicsExists.isEmpty()) {
+ if (failOnExisting) {
+ log.warn("Topic(s) {} already exist.", topicsExists);
+ throw new TopicExistsException("One or more topics already exist.");
+ } else {
+ verifyTopics(log, adminClient, topicsExists, topics);
}
}
}
+
+ /**
+ * Creates Kafka topics and returns a list of topics that already exist
+ * @param log The logger to use
+ * @param adminClient AdminClient
+ * @param topics List of topics to create
+ * @return Collection of topics names that already exist.
+ * @throws Throwable if creation of one or more topics fails (except for topic exists case).
+ */
+ private static Collection createTopics(Logger log, AdminClient adminClient,
+ Collection topics) throws Throwable {
+ long startMs = Time.SYSTEM.milliseconds();
+ int tries = 0;
+ List existingTopics = new ArrayList<>();
+
+ Map newTopics = new HashMap<>();
+ for (NewTopic newTopic : topics) {
+ newTopics.put(newTopic.name(), newTopic);
+ }
+ List topicsToCreate = new ArrayList<>(newTopics.keySet());
+ while (true) {
+ log.info("Attempting to create {} topics (try {})...", topicsToCreate.size(), ++tries);
+ Map> creations = new HashMap<>();
+ while (!topicsToCreate.isEmpty()) {
+ List newTopicsBatch = new ArrayList<>();
+ for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
+ !topicsToCreate.isEmpty(); i++) {
+ String topicName = topicsToCreate.remove(0);
+ newTopicsBatch.add(newTopics.get(topicName));
+ }
+ creations.putAll(adminClient.createTopics(newTopicsBatch).values());
+ }
+ // We retry cases where the topic creation failed with a
+ // timeout. This is a workaround for KAFKA-6368.
+ for (Map.Entry> entry : creations.entrySet()) {
+ String topicName = entry.getKey();
+ Future future = entry.getValue();
+ try {
+ future.get();
+ log.debug("Successfully created {}.", topicName);
+ } catch (Exception e) {
+ if ((e.getCause() instanceof TimeoutException)
+ || (e.getCause() instanceof NotEnoughReplicasException)) {
+ log.warn("Attempt to create topic `{}` failed: {}", topicName,
+ e.getCause().getMessage());
+ topicsToCreate.add(topicName);
+ } else if (e.getCause() instanceof TopicExistsException) {
+ log.info("Topic {} already exists.", topicName);
+ existingTopics.add(topicName);
+ } else {
+ log.warn("Failed to create {}", topicName, e.getCause());
+ throw e.getCause();
+ }
+ }
+ }
+ if (topicsToCreate.isEmpty()) {
+ break;
+ }
+ if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
+ String str = "Unable to create topic(s): " +
+ Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
+ log.warn(str);
+ throw new TimeoutException(str);
+ }
+ }
+ return existingTopics;
+ }
+
+ /**
+ * Verifies that topics in 'topicsToVerify' list have the same number of partitions as
+ * described in 'topicsInfo'
+ * @param log The logger to use
+ * @param adminClient AdminClient
+ * @param topicsToVerify List of topics to verify
+ * @param topicsInfo Map of topic name to topic description, which includes topics in
+ * 'topicsToVerify' list.
+ * @throws RuntimeException If one or more topics have different number of partitions than
+ * described in 'topicsInfo'
+ */
+ private static void verifyTopics(
+ Logger log, AdminClient adminClient,
+ Collection topicsToVerify, Map topicsInfo) throws Throwable {
+ DescribeTopicsResult topicsResult = adminClient.describeTopics(
+ topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
+ Map topicDescriptionMap = topicsResult.all().get();
+ for (TopicDescription desc: topicDescriptionMap.values()) {
+ // map will always contain the topic since all topics in 'topicsExists' are in given
+ // 'topics' map
+ int partitions = topicsInfo.get(desc.name()).numPartitions();
+ if (desc.partitions().size() != partitions) {
+ String str = "Topic '" + desc.name() + "' exists, but has "
+ + desc.partitions().size() + " partitions, while requested "
+ + " number of partitions is " + partitions;
+ log.warn(str);
+ throw new RuntimeException(str);
+ }
+ }
+ }
+
+ private static AdminClient createAdminClient(String bootstrapServers) {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+ return AdminClient.create(props);
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index a798e73a754..7b1bedd1830 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -33,6 +33,11 @@ import java.util.Set;
* The specification for a benchmark that produces messages to a set of topics.
*/
public class ProduceBenchSpec extends TaskSpec {
+
+ private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
+ private static final int DEFAULT_NUM_PARTITIONS = 1;
+ private static final short DEFAULT_REPLICATION_FACTOR = 3;
+
private final String producerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
@@ -42,6 +47,9 @@ public class ProduceBenchSpec extends TaskSpec {
private final Map producerConf;
private final int totalTopics;
private final int activeTopics;
+ private final String topicPrefix;
+ private final int numPartitions;
+ private final short replicationFactor;
@JsonCreator
public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@@ -54,7 +62,10 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("producerConf") Map producerConf,
@JsonProperty("totalTopics") int totalTopics,
- @JsonProperty("activeTopics") int activeTopics) {
+ @JsonProperty("activeTopics") int activeTopics,
+ @JsonProperty("topicPrefix") String topicPrefix,
+ @JsonProperty("partitionsPerTopic") int partitionsPerTopic,
+ @JsonProperty("replicationFactor") short replicationFactor) {
super(startMs, durationMs);
this.producerNode = (producerNode == null) ? "" : producerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@@ -67,6 +78,11 @@ public class ProduceBenchSpec extends TaskSpec {
this.producerConf = (producerConf == null) ? new TreeMap() : producerConf;
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
+ this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix;
+ this.numPartitions = (partitionsPerTopic == 0)
+ ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
+ this.replicationFactor = (replicationFactor == 0)
+ ? DEFAULT_REPLICATION_FACTOR : replicationFactor;
}
@JsonProperty
@@ -114,6 +130,21 @@ public class ProduceBenchSpec extends TaskSpec {
return activeTopics;
}
+ @JsonProperty
+ public String topicPrefix() {
+ return topicPrefix;
+ }
+
+ @JsonProperty
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @JsonProperty
+ public short replicationFactor() {
+ return replicationFactor;
+ }
+
@Override
public TaskController newController(String id) {
return new TaskController() {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 51f52d30aae..e291bae4088 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -37,8 +37,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@@ -51,11 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class ProduceBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
-
- private static final short NUM_PARTITIONS = 1;
-
- private static final short REPLICATION_FACTOR = 3;
-
+
private static final int THROTTLE_PERIOD_MS = 100;
private final String id;
@@ -76,8 +71,8 @@ public class ProduceBenchWorker implements TaskWorker {
* @param topicIndex The topic number.
* @return The topic name.
*/
- public static String topicIndexToName(int topicIndex) {
- return String.format("topic%05d", topicIndex);
+ public String topicIndexToName(int topicIndex) {
+ return String.format("%s%05d", spec.topicPrefix(), topicIndex);
}
public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
@@ -111,11 +106,13 @@ public class ProduceBenchWorker implements TaskWorker {
"activeTopics was %d, but totalTopics was only %d. activeTopics must " +
"be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
}
- List newTopics = new ArrayList<>();
+ Map newTopics = new HashMap<>();
for (int i = 0; i < spec.totalTopics(); i++) {
- newTopics.add(new NewTopic(topicIndexToName(i), NUM_PARTITIONS, REPLICATION_FACTOR));
+ String name = topicIndexToName(i);
+ newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor()));
}
- WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics);
+ WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics, false);
+
executor.submit(new SendRecords());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 1b9cb8f1af0..a05785c04e0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -123,8 +123,11 @@ public class RoundTripWorker implements TaskWorker {
if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) {
throw new ConfigException("Invalid null or empty partitionAssignments.");
}
- WorkerUtils.createTopics(log, spec.bootstrapServers(),
- Collections.singletonList(new NewTopic(TOPIC_NAME, spec.partitionAssignments())));
+ WorkerUtils.createTopics(
+ log, spec.bootstrapServers(),
+ Collections.singletonMap(TOPIC_NAME,
+ new NewTopic(TOPIC_NAME, spec.partitionAssignments())),
+ true);
executor.submit(new ProducerRunnable());
executor.submit(new ConsumerRunnable());
} catch (Throwable e) {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 77a793236ea..dee7614292a 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -49,7 +49,7 @@ public class JsonSerializationTest {
verify(new WorkerRunning(null, 0, null));
verify(new WorkerStopping(null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
- 0, 0, null, null, null, 0, 0));
+ 0, 0, null, null, null, 0, 0, "test-topic", 1, (short) 3));
verify(new RoundTripWorkloadSpec(0, 0, null, null,
0, null, null, 0));
verify(new SampleTaskSpec(0, 0, 0, null));
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
new file mode 100644
index 00000000000..22b784600f6
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+
+
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.clients.admin.MockAdminClient;
+
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class WorkerUtilsTest {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class);
+
+ private final Node broker1 = new Node(0, "testHost-1", 1234);
+ private final Node broker2 = new Node(1, "testHost-2", 1234);
+ private final Node broker3 = new Node(1, "testHost-3", 1234);
+ private final List cluster = Arrays.asList(broker1, broker2, broker3);
+ private final List singleReplica = Collections.singletonList(broker1);
+
+ private static final String TEST_TOPIC = "test-topic-1";
+ private static final short TEST_REPLICATION_FACTOR = 1;
+ private static final int TEST_PARTITIONS = 1;
+ private static final NewTopic NEW_TEST_TOPIC =
+ new NewTopic(TEST_TOPIC, TEST_PARTITIONS, TEST_REPLICATION_FACTOR);
+
+ private MockAdminClient adminClient;
+
+
+ @Before
+ public void setUp() throws Exception {
+ adminClient = new MockAdminClient(cluster, broker1);
+ }
+
+ @Test
+ public void testCreateOneTopic() throws Throwable {
+ Map newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
+
+ WorkerUtils.createTopics(log, adminClient, newTopics, true);
+ assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
+ assertEquals(
+ new TopicDescription(
+ TEST_TOPIC, false,
+ Collections.singletonList(
+ new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
+ adminClient.describeTopics(
+ Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+ );
+ }
+
+ @Test
+ public void testCreateRetriesOnTimeout() throws Throwable {
+ adminClient.timeoutNextRequest(1);
+
+ WorkerUtils.createTopics(
+ log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), true);
+
+ assertEquals(
+ new TopicDescription(
+ TEST_TOPIC, false,
+ Collections.singletonList(
+ new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
+ adminClient.describeTopics(
+ Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+ );
+ }
+
+ @Test
+ public void testCreateZeroTopicsDoesNothing() throws Throwable {
+ WorkerUtils.createTopics(log, adminClient, Collections.emptyMap(), true);
+ assertEquals(0, adminClient.listTopics().names().get().size());
+ }
+
+ @Test(expected = TopicExistsException.class)
+ public void testCreateTopicsFailsIfAtLeastOneTopicExists() throws Throwable {
+ adminClient.addTopic(
+ false,
+ TEST_TOPIC,
+ Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),
+ null);
+
+ Map newTopics = new HashMap<>();
+ newTopics.put(TEST_TOPIC, NEW_TEST_TOPIC);
+ newTopics.put("another-topic",
+ new NewTopic("another-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
+ newTopics.put("one-more-topic",
+ new NewTopic("one-more-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
+
+ WorkerUtils.createTopics(log, adminClient, newTopics, true);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testExistingTopicsMustHaveRequestedNumberOfPartitions() throws Throwable {
+ List tpInfo = new ArrayList<>();
+ tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
+ tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
+ adminClient.addTopic(
+ false,
+ TEST_TOPIC,
+ tpInfo,
+ null);
+
+ WorkerUtils.createTopics(
+ log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
+ }
+
+ @Test
+ public void testExistingTopicsNotCreated() throws Throwable {
+ final String existingTopic = "existing-topic";
+ List tpInfo = new ArrayList<>();
+ tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
+ tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
+ tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, Collections.emptyList()));
+ adminClient.addTopic(
+ false,
+ existingTopic,
+ tpInfo,
+ null);
+
+ WorkerUtils.createTopics(
+ log, adminClient,
+ Collections.singletonMap(
+ existingTopic,
+ new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)), false);
+
+ assertEquals(Collections.singleton(existingTopic), adminClient.listTopics().names().get());
+ }
+
+ @Test
+ public void testCreatesNotExistingTopics() throws Throwable {
+ // should be no topics before the call
+ assertEquals(0, adminClient.listTopics().names().get().size());
+
+ WorkerUtils.createTopics(
+ log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
+
+ assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
+ assertEquals(
+ new TopicDescription(
+ TEST_TOPIC, false,
+ Collections.singletonList(
+ new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
+ adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+ );
+ }
+
+ @Test
+ public void testCreatesOneTopicVerifiesOneTopic() throws Throwable {
+ final String existingTopic = "existing-topic";
+ List tpInfo = new ArrayList<>();
+ tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
+ tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
+ adminClient.addTopic(
+ false,
+ existingTopic,
+ tpInfo,
+ null);
+
+ Map topics = new HashMap<>();
+ topics.put(existingTopic,
+ new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR));
+ topics.put(TEST_TOPIC, NEW_TEST_TOPIC);
+
+ WorkerUtils.createTopics(log, adminClient, topics, false);
+
+ assertEquals(Utils.mkSet(existingTopic, TEST_TOPIC), adminClient.listTopics().names().get());
+ }
+
+ @Test
+ public void testCreateNonExistingTopicsWithZeroTopicsDoesNothing() throws Throwable {
+ WorkerUtils.createTopics(
+ log, adminClient, Collections.emptyMap(), false);
+ assertEquals(0, adminClient.listTopics().names().get().size());
+ }
+
+}