KAFKA-12270: Handle race condition when Connect tasks attempt to create topics (#10032)

When a source connector is configured to create missing topics has multiple tasks that generate records for the same topic, it is possible that multiple tasks may simultaneously describe the topic, find it does not exist, and attempt to create the task. One of those create topic requests will succeed, and the other concurrent tasks will receive the response from the topic admin as having not created the task and will fail unnecessarily.

This change corrects the logic by moving the `TopicAdmin` logic to create a topic to a new `createOrFindTopics(…)` method that returns the set of created topic names and the set of existing topic names. This allows the existing `createTopics(…)` and `createTopic(…)` methods to retain the same behavior, but also allows the `WorkerSourceTask` to know from its single call to this new method whether the topic was created or was found to exist.

This adds one unit test and modifies several unit tests in `WorkerSourceTaskWithTopicCreationTest` that use mocks to verify the behavior, and modifies several existing unit tests for `TopicAdminTest` to ensure the logic of the new method is as expected.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
Randall Hauch 2021-02-03 18:29:55 -06:00 committed by GitHub
parent f58c2acf26
commit 5eb8a238e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 192 additions and 24 deletions

View File

@ -412,10 +412,15 @@ class WorkerSourceTask extends WorkerTask {
log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup); log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
NewTopic newTopic = topicGroup.newTopic(topic); NewTopic newTopic = topicGroup.newTopic(topic);
if (admin.createTopic(newTopic)) { TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic);
if (response.isCreated(newTopic.name())) {
topicCreation.addTopic(topic); topicCreation.addTopic(topic);
log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
} else if (response.isExisting(newTopic.name())) {
topicCreation.addTopic(topic);
log.info("Found existing topic '{}'", newTopic);
} else { } else {
// The topic still does not exist and could not be created, so treat it as a task failure
log.warn("Request to create new topic '{}' failed", topic); log.warn("Request to create new topic '{}' failed", topic);
throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure " throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure "
+ "that the task is authorized to create topics or that the topic exists and " + "that the task is authorized to create topics or that the topic exists and "

View File

@ -60,6 +60,60 @@ import java.util.stream.Collectors;
*/ */
public class TopicAdmin implements AutoCloseable { public class TopicAdmin implements AutoCloseable {
public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
public static class TopicCreationResponse {
private final Set<String> created;
private final Set<String> existing;
public TopicCreationResponse(Set<String> createdTopicNames, Set<String> existingTopicNames) {
this.created = Collections.unmodifiableSet(createdTopicNames);
this.existing = Collections.unmodifiableSet(existingTopicNames);
}
public Set<String> createdTopics() {
return created;
}
public Set<String> existingTopics() {
return existing;
}
public boolean isCreated(String topicName) {
return created.contains(topicName);
}
public boolean isExisting(String topicName) {
return existing.contains(topicName);
}
public boolean isCreatedOrExisting(String topicName) {
return isCreated(topicName) || isExisting(topicName);
}
public int createdTopicsCount() {
return created.size();
}
public int existingTopicsCount() {
return existing.size();
}
public int createdOrExistingTopicsCount() {
return createdTopicsCount() + existingTopicsCount();
}
public boolean isEmpty() {
return createdOrExistingTopicsCount() == 0;
}
@Override
public String toString() {
return "TopicCreationResponse{" + "created=" + created + ", existing=" + existing + '}';
}
}
public static final int NO_PARTITIONS = -1; public static final int NO_PARTITIONS = -1;
public static final short NO_REPLICATION_FACTOR = -1; public static final short NO_REPLICATION_FACTOR = -1;
@ -260,13 +314,48 @@ public class TopicAdmin implements AutoCloseable {
* attempting to perform this operation * attempting to perform this operation
*/ */
public Set<String> createTopics(NewTopic... topics) { public Set<String> createTopics(NewTopic... topics) {
return createOrFindTopics(topics).createdTopics();
}
/**
* Attempt to find or create the topic described by the given definition, returning true if the topic was created or had
* already existed, or false if the topic did not exist and could not be created.
*
* @param topic the specification of the topic
* @return true if the topic was created or existed, or false if the topic could not already existed.
* @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while
* attempting to perform this operation
* @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request
*/
public boolean createOrFindTopic(NewTopic topic) {
if (topic == null) return false;
return createOrFindTopics(topic).isCreatedOrExisting(topic.name());
}
/**
* Attempt to create the topics described by the given definitions, returning all of the names of those topics that
* were created by this request. Any existing topics with the same name are unchanged, and the names of such topics
* are excluded from the result.
* <p>
* If multiple topic definitions have the same topic name, the last one with that name will be used.
* <p>
* Apache Kafka added support for creating topics in 0.10.1.0, so this method works as expected with that and later versions.
* With brokers older than 0.10.1.0, this method is unable to create topics and always returns an empty set.
*
* @param topics the specifications of the topics
* @return the {@link TopicCreationResponse} with the names of the newly created and existing topics;
* never null but possibly empty
* @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while
* attempting to perform this operation
*/
public TopicCreationResponse createOrFindTopics(NewTopic... topics) {
Map<String, NewTopic> topicsByName = new HashMap<>(); Map<String, NewTopic> topicsByName = new HashMap<>();
if (topics != null) { if (topics != null) {
for (NewTopic topic : topics) { for (NewTopic topic : topics) {
if (topic != null) topicsByName.put(topic.name(), topic); if (topic != null) topicsByName.put(topic.name(), topic);
} }
} }
if (topicsByName.isEmpty()) return Collections.emptySet(); if (topicsByName.isEmpty()) return EMPTY_CREATION;
String bootstrapServers = bootstrapServers(); String bootstrapServers = bootstrapServers();
String topicNameList = Utils.join(topicsByName.keySet(), "', '"); String topicNameList = Utils.join(topicsByName.keySet(), "', '");
@ -276,6 +365,7 @@ public class TopicAdmin implements AutoCloseable {
// Iterate over each future so that we can handle individual failures like when some topics already exist // Iterate over each future so that we can handle individual failures like when some topics already exist
Set<String> newlyCreatedTopicNames = new HashSet<>(); Set<String> newlyCreatedTopicNames = new HashSet<>();
Set<String> existingTopicNames = new HashSet<>();
for (Map.Entry<String, KafkaFuture<Void>> entry : newResults.entrySet()) { for (Map.Entry<String, KafkaFuture<Void>> entry : newResults.entrySet()) {
String topic = entry.getKey(); String topic = entry.getKey();
try { try {
@ -288,25 +378,26 @@ public class TopicAdmin implements AutoCloseable {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
if (cause instanceof TopicExistsException) { if (cause instanceof TopicExistsException) {
log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers); log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers);
existingTopicNames.add(topic);
continue; continue;
} }
if (cause instanceof UnsupportedVersionException) { if (cause instanceof UnsupportedVersionException) {
log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API." + log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API." +
" Falling back to assume topic(s) exist or will be auto-created by the broker.", " Falling back to assume topic(s) exist or will be auto-created by the broker.",
topicNameList, bootstrapServers); topicNameList, bootstrapServers);
return Collections.emptySet(); return EMPTY_CREATION;
} }
if (cause instanceof ClusterAuthorizationException) { if (cause instanceof ClusterAuthorizationException) {
log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." + log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." +
" Falling back to assume topic(s) exist or will be auto-created by the broker.", " Falling back to assume topic(s) exist or will be auto-created by the broker.",
topicNameList, bootstrapServers); topicNameList, bootstrapServers);
return Collections.emptySet(); return EMPTY_CREATION;
} }
if (cause instanceof TopicAuthorizationException) { if (cause instanceof TopicAuthorizationException) {
log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." + log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." +
" Falling back to assume topic(s) exist or will be auto-created by the broker.", " Falling back to assume topic(s) exist or will be auto-created by the broker.",
topicNameList, bootstrapServers); topicNameList, bootstrapServers);
return Collections.emptySet(); return EMPTY_CREATION;
} }
if (cause instanceof InvalidConfigurationException) { if (cause instanceof InvalidConfigurationException) {
throw new ConnectException("Unable to create topic(s) '" + topicNameList + "': " + cause.getMessage(), throw new ConnectException("Unable to create topic(s) '" + topicNameList + "': " + cause.getMessage(),
@ -324,7 +415,7 @@ public class TopicAdmin implements AutoCloseable {
throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", e); throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", e);
} }
} }
return newlyCreatedTopicNames; return new TopicCreationResponse(newlyCreatedTopicNames, existingTopicNames);
} }
/** /**

View File

@ -79,6 +79,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
@ -529,7 +530,10 @@ public class ErrorHandlingTaskWithTopicCreationTest {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true); Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
} }
} }

View File

@ -80,6 +80,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -956,7 +957,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
expectPreliminaryCalls(); expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout"))); .andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round // Second round
@ -1034,7 +1035,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
// First call to create the topic times out // First call to create the topic times out
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new RetriableException(new TimeoutException("timeout"))); .andThrow(new RetriableException(new TimeoutException("timeout")));
// Second round // Second round
@ -1085,7 +1086,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))) EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
.andThrow(new ConnectException(new TopicAuthorizationException("unauthorized"))); .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
PowerMock.replayAll(); PowerMock.replayAll();
@ -1096,7 +1097,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
} }
@Test @Test
public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception { public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() throws Exception {
createWorkerTask(); createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
@ -1106,7 +1107,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false); EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
PowerMock.replayAll(); PowerMock.replayAll();
@ -1115,6 +1116,62 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
assertTrue(newTopicCapture.hasCaptured()); assertTrue(newTopicCapture.hasCaptured());
} }
@Test
public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception {
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
@Test
public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception {
createWorkerTask();
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
expectPreliminaryCalls();
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
expectSendRecordTaskCommitRecordSucceed(false, false);
expectSendRecordTaskCommitRecordSucceed(false, false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
}
private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
return new TopicAdmin.TopicCreationResponse(created, existing);
}
private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
Set<String> created = Collections.emptySet();
Set<String> existing = Collections.singleton(topic);
return new TopicAdmin.TopicCreationResponse(created, existing);
}
private void expectPreliminaryCalls() { private void expectPreliminaryCalls() {
expectPreliminaryCalls(TOPIC); expectPreliminaryCalls(TOPIC);
} }
@ -1431,7 +1488,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
if (config.topicCreationEnable()) { if (config.topicCreationEnable()) {
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true); EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
} }
} }
} }

View File

@ -74,15 +74,14 @@ public class TopicAdminTest {
* create no topics, and return false. * create no topics, and return false.
*/ */
@Test @Test
public void returnNullWithApiVersionMismatchOnCreate() { public void returnEmptyWithApiVersionMismatchOnCreate() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1); Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic)); env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient()); TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic); assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
assertFalse(created);
} }
} }
@ -105,14 +104,16 @@ public class TopicAdminTest {
} }
@Test @Test
public void returnNullWithClusterAuthorizationFailureOnCreate() { public void returnEmptyWithClusterAuthorizationFailureOnCreate() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1); Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic)); env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient()); TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic); assertFalse(admin.createTopic(newTopic));
assertFalse(created);
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
} }
} }
@ -129,14 +130,16 @@ public class TopicAdminTest {
} }
@Test @Test
public void returnNullWithTopicAuthorizationFailureOnCreate() { public void returnEmptyWithTopicAuthorizationFailureOnCreate() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1); Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic)); env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient()); TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic); assertFalse(admin.createTopic(newTopic));
assertFalse(created);
env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
} }
} }
@ -161,6 +164,12 @@ public class TopicAdminTest {
mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null); mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null);
TopicAdmin admin = new TopicAdmin(null, mockAdminClient); TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
assertFalse(admin.createTopic(newTopic)); assertFalse(admin.createTopic(newTopic));
assertTrue(admin.createTopics(newTopic).isEmpty());
assertTrue(admin.createOrFindTopic(newTopic));
TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic);
assertTrue(response.isCreatedOrExisting(newTopic.name()));
assertTrue(response.isExisting(newTopic.name()));
assertFalse(response.isCreated(newTopic.name()));
} }
} }
@ -509,7 +518,9 @@ public class TopicAdminTest {
clientBuilder.controller(0); clientBuilder.controller(0);
try (MockAdminClient admin = clientBuilder.build()) { try (MockAdminClient admin = clientBuilder.build()) {
TopicAdmin topicClient = new TopicAdmin(null, admin, false); TopicAdmin topicClient = new TopicAdmin(null, admin, false);
assertTrue(topicClient.createTopic(newTopic)); TopicAdmin.TopicCreationResponse response = topicClient.createOrFindTopics(newTopic);
assertTrue(response.isCreated(newTopic.name()));
assertFalse(response.isExisting(newTopic.name()));
assertTopic(admin, newTopic.name(), expectedPartitions, expectedReplicas); assertTopic(admin, newTopic.name(), expectedPartitions, expectedReplicas);
} }
} }