MINOR: WorkerUtils#topicDescriptions must unwrap exceptions properly (#6937)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
This commit is contained in:
Colin Patrick McCabe 2019-07-03 16:08:39 -07:00 committed by GitHub
parent 07a154eee7
commit 822abe47db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 21 deletions

View File

@ -205,7 +205,11 @@ public class MockAdminClient extends AdminClient {
for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) { for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
String topicName = topicDescription.getKey(); String topicName = topicDescription.getKey();
topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic)); if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
topicDescription.getValue().fetchesRemainingUntilVisible--;
} else {
topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic));
}
} }
KafkaFutureImpl<Map<String, TopicListing>> future = new KafkaFutureImpl<>(); KafkaFutureImpl<Map<String, TopicListing>> future = new KafkaFutureImpl<>();
@ -232,12 +236,16 @@ public class MockAdminClient extends AdminClient {
for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) { for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
String topicName = topicDescription.getKey(); String topicName = topicDescription.getKey();
if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) { if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) {
TopicMetadata topicMetadata = topicDescription.getValue(); if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); topicDescription.getValue().fetchesRemainingUntilVisible--;
future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, } else {
Collections.emptySet())); TopicMetadata topicMetadata = topicDescription.getValue();
topicDescriptions.put(topicName, future); KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
break; future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions,
Collections.emptySet()));
topicDescriptions.put(topicName, future);
break;
}
} }
} }
if (!topicDescriptions.containsKey(requestedTopic)) { if (!topicDescriptions.containsKey(requestedTopic)) {
@ -420,6 +428,7 @@ public class MockAdminClient extends AdminClient {
final boolean isInternalTopic; final boolean isInternalTopic;
final List<TopicPartitionInfo> partitions; final List<TopicPartitionInfo> partitions;
final Map<String, String> configs; final Map<String, String> configs;
int fetchesRemainingUntilVisible;
public boolean markedForDeletion; public boolean markedForDeletion;
@ -430,6 +439,7 @@ public class MockAdminClient extends AdminClient {
this.partitions = partitions; this.partitions = partitions;
this.configs = configs != null ? configs : Collections.emptyMap(); this.configs = configs != null ? configs : Collections.emptyMap();
this.markedForDeletion = false; this.markedForDeletion = false;
this.fetchesRemainingUntilVisible = 0;
} }
} }
@ -441,4 +451,12 @@ public class MockAdminClient extends AdminClient {
public Map<MetricName, ? extends Metric> metrics() { public Map<MetricName, ? extends Metric> metrics() {
return mockMetrics; return mockMetrics;
} }
public void setFetchesRemainingUntilVisible(String topicName, int fetchesRemainingUntilVisible) {
TopicMetadata metadata = allTopics.get(topicName);
if (metadata == null) {
throw new RuntimeException("No such topic as " + topicName);
}
metadata.fetchesRemainingUntilVisible = fetchesRemainingUntilVisible;
}
} }

View File

@ -248,7 +248,7 @@ public final class WorkerUtils {
* @throws RuntimeException If one or more topics have different number of partitions than * @throws RuntimeException If one or more topics have different number of partitions than
* described in 'topicsInfo' * described in 'topicsInfo'
*/ */
private static void verifyTopics( static void verifyTopics(
Logger log, AdminClient adminClient, Logger log, AdminClient adminClient,
Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable { Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
@ -279,9 +279,13 @@ public final class WorkerUtils {
DescribeTopicsResult topicsResult = adminClient.describeTopics( DescribeTopicsResult topicsResult = adminClient.describeTopics(
topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
return topicsResult.all().get(); return topicsResult.all().get();
} catch (UnknownTopicOrPartitionException exception) { } catch (ExecutionException exception) {
lastException = exception; if (exception.getCause() instanceof UnknownTopicOrPartitionException) {
Thread.sleep(retryBackoffMs); lastException = (UnknownTopicOrPartitionException) exception.getCause();
Thread.sleep(retryBackoffMs);
} else {
throw exception;
}
} }
} }
throw lastException; throw lastException;

View File

@ -17,24 +17,23 @@
package org.apache.kafka.trogdor.common; package org.apache.kafka.trogdor.common;
import static org.junit.Assert.assertEquals;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Node;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.junit.Assert;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -46,7 +45,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
public class WorkerUtilsTest { public class WorkerUtilsTest {
private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class); private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class);
@ -318,4 +316,21 @@ public class WorkerUtilsTest {
tpInfo, tpInfo,
null); null);
} }
@Test
public void testVerifyTopics() throws Throwable {
Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
WorkerUtils.createTopics(log, adminClient, newTopics, true);
adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 2);
WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC),
Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 3, 1);
adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 100);
try {
WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC),
Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 2, 1);
Assert.fail("expected to get UnknownTopicOrPartitionException");
} catch (UnknownTopicOrPartitionException e) {
// expected
}
}
} }