mirror of https://github.com/apache/kafka.git
MINOR: Switch anonymous classes to lambda expressions in tools module
Switch to lambda when ever possible instead of old anonymous way in tools module Author: Srinivas Reddy <srinivas96alluri@gmail.com> Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com> Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com> Closes #6013 from mrsrinivas/tools-switch-to-java8
This commit is contained in:
parent
ee370d3893
commit
85906d3d2b
|
@ -261,65 +261,68 @@ public class ClientCompatibilityTest {
|
|||
nodes.size(), testConfig.numClusterNodes);
|
||||
}
|
||||
tryFeature("createTopics", testConfig.createTopicsSupported,
|
||||
new Invoker() {
|
||||
@Override
|
||||
public void invoke() throws Throwable {
|
||||
try {
|
||||
client.createTopics(Collections.singleton(
|
||||
new NewTopic("newtopic", 1, (short) 1))).all().get();
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
() -> {
|
||||
try {
|
||||
client.createTopics(Collections.singleton(
|
||||
new NewTopic("newtopic", 1, (short) 1))).all().get();
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
},
|
||||
new ResultTester() {
|
||||
@Override
|
||||
public void test() throws Throwable {
|
||||
while (true) {
|
||||
try {
|
||||
client.describeTopics(Collections.singleton("newtopic")).all().get();
|
||||
break;
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof UnknownTopicOrPartitionException)
|
||||
continue;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
() -> createTopicsResultTest(client, Collections.singleton("newtopic"))
|
||||
);
|
||||
|
||||
while (true) {
|
||||
Collection<TopicListing> listings = client.listTopics().listings().get();
|
||||
if (!testConfig.createTopicsSupported)
|
||||
break;
|
||||
boolean foundNewTopic = false;
|
||||
for (TopicListing listing : listings) {
|
||||
if (listing.name().equals("newtopic")) {
|
||||
if (listing.isInternal())
|
||||
throw new KafkaException("Did not expect newtopic to be an internal topic.");
|
||||
foundNewTopic = true;
|
||||
}
|
||||
}
|
||||
if (foundNewTopic)
|
||||
|
||||
if (topicExists(listings, "newtopic"))
|
||||
break;
|
||||
|
||||
Thread.sleep(1);
|
||||
log.info("Did not see newtopic. Retrying listTopics...");
|
||||
}
|
||||
|
||||
tryFeature("describeAclsSupported", testConfig.describeAclsSupported,
|
||||
new Invoker() {
|
||||
@Override
|
||||
public void invoke() throws Throwable {
|
||||
try {
|
||||
client.describeAcls(AclBindingFilter.ANY).values().get();
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof SecurityDisabledException)
|
||||
return;
|
||||
throw e.getCause();
|
||||
}
|
||||
() -> {
|
||||
try {
|
||||
client.describeAcls(AclBindingFilter.ANY).values().get();
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof SecurityDisabledException)
|
||||
return;
|
||||
throw e.getCause();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void createTopicsResultTest(AdminClient client, Collection<String> topics)
|
||||
throws InterruptedException, ExecutionException {
|
||||
while (true) {
|
||||
try {
|
||||
client.describeTopics(topics).all().get();
|
||||
break;
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof UnknownTopicOrPartitionException)
|
||||
continue;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean topicExists(Collection<TopicListing> listings, String topicName) {
|
||||
boolean foundTopic = false;
|
||||
for (TopicListing listing : listings) {
|
||||
if (listing.name().equals(topicName)) {
|
||||
if (listing.isInternal())
|
||||
throw new KafkaException(String.format("Did not expect %s to be an internal topic.", topicName));
|
||||
foundTopic = true;
|
||||
}
|
||||
}
|
||||
return foundTopic;
|
||||
}
|
||||
|
||||
private static class OffsetsForTime {
|
||||
Map<TopicPartition, OffsetAndTimestamp> result;
|
||||
|
||||
|
@ -384,18 +387,8 @@ public class ClientCompatibilityTest {
|
|||
}
|
||||
final OffsetsForTime offsetsForTime = new OffsetsForTime();
|
||||
tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
|
||||
new Invoker() {
|
||||
@Override
|
||||
public void invoke() {
|
||||
offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
},
|
||||
new ResultTester() {
|
||||
@Override
|
||||
public void test() {
|
||||
log.info("offsetsForTime = {}", offsetsForTime.result);
|
||||
}
|
||||
});
|
||||
() -> offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch),
|
||||
() -> log.info("offsetsForTime = {}", offsetsForTime.result));
|
||||
// Whether or not offsetsForTimes works, beginningOffsets and endOffsets
|
||||
// should work.
|
||||
consumer.beginningOffsets(timestampsToSearch.keySet());
|
||||
|
@ -486,11 +479,7 @@ public class ClientCompatibilityTest {
|
|||
}
|
||||
|
||||
private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable {
|
||||
tryFeature(featureName, supported, invoker, new ResultTester() {
|
||||
@Override
|
||||
public void test() {
|
||||
}
|
||||
});
|
||||
tryFeature(featureName, supported, invoker, () -> { });
|
||||
}
|
||||
|
||||
private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester)
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.tools;
|
|||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -32,12 +31,7 @@ public class ToolsUtils {
|
|||
public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
|
||||
if (metrics != null && !metrics.isEmpty()) {
|
||||
int maxLengthOfDisplayName = 0;
|
||||
TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>() {
|
||||
@Override
|
||||
public int compare(String o1, String o2) {
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
});
|
||||
TreeMap<String, Object> sortedMetrics = new TreeMap<>();
|
||||
for (Metric metric : metrics.values()) {
|
||||
MetricName mName = metric.metricName();
|
||||
String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
|
||||
|
|
|
@ -263,18 +263,15 @@ public class TransactionalMessageCopier {
|
|||
final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
|
||||
final AtomicLong remainingMessages = new AtomicLong(maxMessages);
|
||||
final AtomicLong numMessagesProcessed = new AtomicLong(0);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
isShuttingDown.set(true);
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
synchronized (consumer) {
|
||||
consumer.close();
|
||||
}
|
||||
System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
isShuttingDown.set(true);
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
synchronized (consumer) {
|
||||
consumer.close();
|
||||
}
|
||||
});
|
||||
System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
|
||||
}));
|
||||
|
||||
try {
|
||||
Random random = new Random();
|
||||
|
|
|
@ -620,12 +620,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
|||
|
||||
try {
|
||||
final VerifiableConsumer consumer = createFromArgs(parser, args);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.close();
|
||||
}
|
||||
});
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close()));
|
||||
|
||||
consumer.run();
|
||||
} catch (ArgumentParserException e) {
|
||||
|
|
|
@ -241,13 +241,10 @@ public class VerifiableLog4jAppender {
|
|||
final VerifiableLog4jAppender appender = createFromArgs(args);
|
||||
boolean infinite = appender.maxMessages < 0;
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Trigger main thread to stop producing messages
|
||||
appender.stopLogging = true;
|
||||
}
|
||||
});
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
// Trigger main thread to stop producing messages
|
||||
appender.stopLogging = true;
|
||||
}));
|
||||
|
||||
long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
|
||||
for (long i = 0; i < maxMessages; i++) {
|
||||
|
|
|
@ -517,22 +517,19 @@ public class VerifiableProducer implements AutoCloseable {
|
|||
final long startMs = System.currentTimeMillis();
|
||||
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Trigger main thread to stop producing messages
|
||||
producer.stopProducing = true;
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
// Trigger main thread to stop producing messages
|
||||
producer.stopProducing = true;
|
||||
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
|
||||
// Print a summary
|
||||
long stopMs = System.currentTimeMillis();
|
||||
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
||||
// Print a summary
|
||||
long stopMs = System.currentTimeMillis();
|
||||
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
||||
|
||||
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
|
||||
}
|
||||
});
|
||||
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
|
||||
}));
|
||||
|
||||
producer.run(throttler);
|
||||
} catch (ArgumentParserException e) {
|
||||
|
|
|
@ -147,18 +147,15 @@ public final class Agent {
|
|||
log.info("Starting agent process.");
|
||||
final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource);
|
||||
restServer.start(resource);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
log.warn("Running agent shutdown hook.");
|
||||
try {
|
||||
agent.beginShutdown();
|
||||
agent.waitForShutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("Got exception while running agent shutdown hook.", e);
|
||||
}
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
log.warn("Running agent shutdown hook.");
|
||||
try {
|
||||
agent.beginShutdown();
|
||||
agent.waitForShutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("Got exception while running agent shutdown hook.", e);
|
||||
}
|
||||
});
|
||||
}));
|
||||
agent.waitForShutdown();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -317,21 +317,18 @@ public final class WorkerManager {
|
|||
return;
|
||||
}
|
||||
KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
|
||||
haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
|
||||
@Override
|
||||
public Void apply(String errorString) {
|
||||
if (errorString == null)
|
||||
errorString = "";
|
||||
if (errorString.isEmpty()) {
|
||||
log.info("{}: Worker {} is halting.", nodeName, worker);
|
||||
} else {
|
||||
log.info("{}: Worker {} is halting with error {}",
|
||||
nodeName, worker, errorString);
|
||||
}
|
||||
stateChangeExecutor.submit(
|
||||
new HandleWorkerHalting(worker, errorString, false));
|
||||
return null;
|
||||
haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString -> {
|
||||
if (errorString == null)
|
||||
errorString = "";
|
||||
if (errorString.isEmpty()) {
|
||||
log.info("{}: Worker {} is halting.", nodeName, worker);
|
||||
} else {
|
||||
log.info("{}: Worker {} is halting with error {}",
|
||||
nodeName, worker, errorString);
|
||||
}
|
||||
stateChangeExecutor.submit(
|
||||
new HandleWorkerHalting(worker, errorString, false));
|
||||
return null;
|
||||
});
|
||||
try {
|
||||
worker.taskWorker.start(platform, worker.status, haltFuture);
|
||||
|
|
|
@ -162,18 +162,15 @@ public final class Coordinator {
|
|||
final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
|
||||
restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
|
||||
restServer.start(resource);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
log.warn("Running coordinator shutdown hook.");
|
||||
try {
|
||||
coordinator.beginShutdown(false);
|
||||
coordinator.waitForShutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("Got exception while running coordinator shutdown hook.", e);
|
||||
}
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
log.warn("Running coordinator shutdown hook.");
|
||||
try {
|
||||
coordinator.beginShutdown(false);
|
||||
coordinator.waitForShutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("Got exception while running coordinator shutdown hook.", e);
|
||||
}
|
||||
});
|
||||
}));
|
||||
coordinator.waitForShutdown();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -132,22 +132,19 @@ public class JsonRestServer {
|
|||
*/
|
||||
public void beginShutdown() {
|
||||
if (!shutdownExecutor.isShutdown()) {
|
||||
shutdownExecutor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
try {
|
||||
log.info("Stopping REST server");
|
||||
jettyServer.stop();
|
||||
jettyServer.join();
|
||||
log.info("REST server stopped");
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to stop REST server", e);
|
||||
} finally {
|
||||
jettyServer.destroy();
|
||||
}
|
||||
shutdownExecutor.shutdown();
|
||||
return null;
|
||||
shutdownExecutor.submit((Callable<Void>) () -> {
|
||||
try {
|
||||
log.info("Stopping REST server");
|
||||
jettyServer.stop();
|
||||
jettyServer.join();
|
||||
log.info("REST server stopped");
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to stop REST server", e);
|
||||
} finally {
|
||||
jettyServer.destroy();
|
||||
}
|
||||
shutdownExecutor.shutdown();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.kafka.trogdor.common.Topology;
|
||||
import org.apache.kafka.trogdor.task.TaskController;
|
||||
import org.apache.kafka.trogdor.task.TaskSpec;
|
||||
import org.apache.kafka.trogdor.task.TaskWorker;
|
||||
|
@ -28,7 +27,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
|
@ -98,12 +96,7 @@ public class ConnectionStressSpec extends TaskSpec {
|
|||
}
|
||||
|
||||
public TaskController newController(String id) {
|
||||
return new TaskController() {
|
||||
@Override
|
||||
public Set<String> targetNodes(Topology topology) {
|
||||
return new TreeSet<>(clientNodes);
|
||||
}
|
||||
};
|
||||
return topology -> new TreeSet<>(clientNodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.kafka.trogdor.common.Topology;
|
||||
import org.apache.kafka.trogdor.task.TaskController;
|
||||
import org.apache.kafka.trogdor.task.TaskSpec;
|
||||
import org.apache.kafka.trogdor.task.TaskWorker;
|
||||
|
@ -27,7 +26,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The specification for a benchmark that produces messages to a set of topics.
|
||||
|
@ -170,12 +168,7 @@ public class ProduceBenchSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return new TaskController() {
|
||||
@Override
|
||||
public Set<String> targetNodes(Topology topology) {
|
||||
return Collections.singleton(producerNode);
|
||||
}
|
||||
};
|
||||
return topology -> Collections.singleton(producerNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,11 +25,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
|
@ -248,16 +246,13 @@ public class RoundTripWorker implements TaskWorker {
|
|||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(partition.topic(),
|
||||
partition.partition(), KEY_GENERATOR.generate(messageIndex),
|
||||
spec.valueGenerator().generate(messageIndex));
|
||||
producer.send(record, new Callback() {
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
if (exception == null) {
|
||||
unackedSends.countDown();
|
||||
} else {
|
||||
log.info("{}: Got exception when sending message {}: {}",
|
||||
id, messageIndex, exception.getMessage());
|
||||
toSendTracker.addFailed(messageIndex);
|
||||
}
|
||||
producer.send(record, (metadata, exception) -> {
|
||||
if (exception == null) {
|
||||
unackedSends.countDown();
|
||||
} else {
|
||||
log.info("{}: Got exception when sending message {}: {}",
|
||||
id, messageIndex, exception.getMessage());
|
||||
toSendTracker.addFailed(messageIndex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -19,14 +19,12 @@ package org.apache.kafka.trogdor.workload;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.kafka.trogdor.common.Topology;
|
||||
import org.apache.kafka.trogdor.task.TaskController;
|
||||
import org.apache.kafka.trogdor.task.TaskSpec;
|
||||
import org.apache.kafka.trogdor.task.TaskWorker;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The specification for a workload that sends messages to a broker and then
|
||||
|
@ -124,12 +122,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return new TaskController() {
|
||||
@Override
|
||||
public Set<String> targetNodes(Topology topology) {
|
||||
return Collections.singleton(clientNode);
|
||||
}
|
||||
};
|
||||
return topology -> Collections.singleton(clientNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -304,7 +304,7 @@ public class AgentTest {
|
|||
try (MockKibosh mockKibosh = new MockKibosh()) {
|
||||
Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
|
||||
FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
|
||||
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
|
||||
new ExpectedTasks().
|
||||
addTask(new ExpectedTaskBuilder("foo").
|
||||
|
@ -314,7 +314,7 @@ public class AgentTest {
|
|||
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
|
||||
new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
|
||||
FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
|
||||
client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
|
||||
new ExpectedTasks().
|
||||
addTask(new ExpectedTaskBuilder("foo").
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.common;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.kafka.test.TestCondition;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.apache.kafka.trogdor.agent.AgentClient;
|
||||
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
|
||||
|
@ -142,71 +141,65 @@ public class ExpectedTasks {
|
|||
}
|
||||
|
||||
public ExpectedTasks waitFor(final CoordinatorClient client) throws InterruptedException {
|
||||
TestUtils.waitForCondition(new TestCondition() {
|
||||
@Override
|
||||
public boolean conditionMet() {
|
||||
TasksResponse tasks = null;
|
||||
try {
|
||||
tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to get coordinator tasks", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
StringBuilder errors = new StringBuilder();
|
||||
for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
ExpectedTask task = entry.getValue();
|
||||
String differences = task.compare(tasks.tasks().get(id));
|
||||
if (differences != null) {
|
||||
errors.append(differences);
|
||||
}
|
||||
}
|
||||
String errorString = errors.toString();
|
||||
if (!errorString.isEmpty()) {
|
||||
log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
|
||||
log.info("ACTUAL TASKS : {}", JsonUtil.toJsonString(tasks.tasks()));
|
||||
log.info(errorString);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
TestUtils.waitForCondition(() -> {
|
||||
TasksResponse tasks = null;
|
||||
try {
|
||||
tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to get coordinator tasks", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
StringBuilder errors = new StringBuilder();
|
||||
for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
ExpectedTask task = entry.getValue();
|
||||
String differences = task.compare(tasks.tasks().get(id));
|
||||
if (differences != null) {
|
||||
errors.append(differences);
|
||||
}
|
||||
}
|
||||
String errorString = errors.toString();
|
||||
if (!errorString.isEmpty()) {
|
||||
log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
|
||||
log.info("ACTUAL TASKS : {}", JsonUtil.toJsonString(tasks.tasks()));
|
||||
log.info(errorString);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}, "Timed out waiting for expected tasks " + JsonUtil.toJsonString(expected));
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExpectedTasks waitFor(final AgentClient client) throws InterruptedException {
|
||||
TestUtils.waitForCondition(new TestCondition() {
|
||||
@Override
|
||||
public boolean conditionMet() {
|
||||
AgentStatusResponse status = null;
|
||||
try {
|
||||
status = client.status();
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to get agent status", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
StringBuilder errors = new StringBuilder();
|
||||
HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
|
||||
for (WorkerState state : status.workers().values()) {
|
||||
taskIdToWorkerState.put(state.taskId(), state);
|
||||
}
|
||||
for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
ExpectedTask worker = entry.getValue();
|
||||
String differences = worker.compare(taskIdToWorkerState.get(id));
|
||||
if (differences != null) {
|
||||
errors.append(differences);
|
||||
}
|
||||
}
|
||||
String errorString = errors.toString();
|
||||
if (!errorString.isEmpty()) {
|
||||
log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
|
||||
log.info("ACTUAL WORKERS : {}", JsonUtil.toJsonString(status.workers()));
|
||||
log.info(errorString);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
TestUtils.waitForCondition(() -> {
|
||||
AgentStatusResponse status = null;
|
||||
try {
|
||||
status = client.status();
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to get agent status", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
StringBuilder errors = new StringBuilder();
|
||||
HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
|
||||
for (WorkerState state : status.workers().values()) {
|
||||
taskIdToWorkerState.put(state.taskId(), state);
|
||||
}
|
||||
for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
ExpectedTask worker = entry.getValue();
|
||||
String differences = worker.compare(taskIdToWorkerState.get(id));
|
||||
if (differences != null) {
|
||||
errors.append(differences);
|
||||
}
|
||||
}
|
||||
String errorString = errors.toString();
|
||||
if (!errorString.isEmpty()) {
|
||||
log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
|
||||
log.info("ACTUAL WORKERS : {}", JsonUtil.toJsonString(status.workers()));
|
||||
log.info(errorString);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}, "Timed out waiting for expected workers " + JsonUtil.toJsonString(expected));
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -172,27 +172,24 @@ public class MiniTrogdorCluster implements AutoCloseable {
|
|||
ThreadUtils.createThreadFactory("MiniTrogdorClusterStartupThread%d", false));
|
||||
final AtomicReference<Exception> failure = new AtomicReference<Exception>(null);
|
||||
for (final Map.Entry<String, NodeData> entry : nodes.entrySet()) {
|
||||
executor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
String nodeName = entry.getKey();
|
||||
try {
|
||||
NodeData node = entry.getValue();
|
||||
node.platform = new BasicPlatform(nodeName, topology, scheduler, commandRunner);
|
||||
if (node.agentRestResource != null) {
|
||||
node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
|
||||
node.agentRestResource);
|
||||
}
|
||||
if (node.coordinatorRestResource != null) {
|
||||
node.coordinator = new Coordinator(node.platform, scheduler,
|
||||
node.coordinatorRestServer, node.coordinatorRestResource, 0);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to initialize {}", nodeName, e);
|
||||
failure.compareAndSet(null, e);
|
||||
executor.submit((Callable<Void>) () -> {
|
||||
String nodeName = entry.getKey();
|
||||
try {
|
||||
NodeData node = entry.getValue();
|
||||
node.platform = new BasicPlatform(nodeName, topology, scheduler, commandRunner);
|
||||
if (node.agentRestResource != null) {
|
||||
node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
|
||||
node.agentRestResource);
|
||||
}
|
||||
return null;
|
||||
if (node.coordinatorRestResource != null) {
|
||||
node.coordinator = new Coordinator(node.platform, scheduler,
|
||||
node.coordinatorRestServer, node.coordinatorRestResource, 0);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to initialize {}", nodeName, e);
|
||||
failure.compareAndSet(null, e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
executor.shutdown();
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.MockScheduler;
|
|||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Scheduler;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.test.TestCondition;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.apache.kafka.trogdor.agent.AgentClient;
|
||||
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
|
||||
|
@ -318,12 +317,8 @@ public class CoordinatorTest {
|
|||
|
||||
public ExpectedLines waitFor(final String nodeName,
|
||||
final CapturingCommandRunner runner) throws InterruptedException {
|
||||
TestUtils.waitForCondition(new TestCondition() {
|
||||
@Override
|
||||
public boolean conditionMet() {
|
||||
return linesMatch(nodeName, runner.lines(nodeName));
|
||||
}
|
||||
}, "failed to find the expected lines " + this.toString());
|
||||
TestUtils.waitForCondition(() -> linesMatch(nodeName, runner.lines(nodeName)),
|
||||
"failed to find the expected lines " + this.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -473,7 +468,7 @@ public class CoordinatorTest {
|
|||
assertEquals(0, coordinatorClient.tasks(
|
||||
new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
|
||||
TasksResponse resp1 = coordinatorClient.tasks(
|
||||
new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0, Optional.empty()));
|
||||
new TasksRequest(Arrays.asList("foo", "baz"), 0, 0, 0, 0, Optional.empty()));
|
||||
assertTrue(resp1.tasks().containsKey("foo"));
|
||||
assertFalse(resp1.tasks().containsKey("bar"));
|
||||
assertEquals(1, resp1.tasks().size());
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
|
|||
import org.apache.kafka.trogdor.common.Platform;
|
||||
import org.apache.kafka.trogdor.common.ThreadUtils;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -53,12 +52,9 @@ public class SampleTaskWorker implements TaskWorker {
|
|||
if (exitMs == null) {
|
||||
exitMs = Long.MAX_VALUE;
|
||||
}
|
||||
this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
haltFuture.complete(spec.error());
|
||||
return null;
|
||||
}
|
||||
this.future = platform.scheduler().schedule(executor, () -> {
|
||||
haltFuture.complete(spec.error());
|
||||
return null;
|
||||
}, exitMs);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue