MINOR: Fix Trogdor tests, partition assignments (#4892)

This commit is contained in:
Colin Patrick McCabe 2018-04-29 07:54:38 -07:00 committed by Rajini Sivaram
parent c6fd3d488e
commit 8577632b3a
6 changed files with 41 additions and 11 deletions

View File

@ -21,7 +21,7 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class ProduceBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
target_messages_per_sec, max_messages, producer_conf,
total_topics, active_topics):
inactive_topics, active_topics):
super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
self.message["producerNode"] = producer_node
@ -29,7 +29,7 @@ class ProduceBenchWorkloadSpec(TaskSpec):
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["maxMessages"] = max_messages
self.message["producerConf"] = producer_conf
self.message["totalTopics"] = total_topics
self.message["inactiveTopics"] = inactive_topics
self.message["activeTopics"] = active_topics

View File

@ -20,14 +20,14 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class RoundTripWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, client_node, bootstrap_servers,
target_messages_per_sec, partition_assignments, max_messages):
target_messages_per_sec, max_messages, active_topics):
super(RoundTripWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec"
self.message["clientNode"] = client_node
self.message["bootstrapServers"] = bootstrap_servers
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["partitionAssignments"] = partition_assignments
self.message["maxMessages"] = max_messages
self.message["activeTopics"] = active_topics
class RoundTripWorkloadService(Service):

View File

@ -43,14 +43,16 @@ class ProduceBenchTest(Test):
self.zk.stop()
def test_produce_bench(self):
active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}}
inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}}
spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.producer_node,
self.workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=100000,
producer_conf={},
total_topics=10,
active_topics=2)
inactive_topics=inactive_topics,
active_topics=active_topics)
workload1 = self.trogdor.create_task("workload1", spec)
workload1.wait_for_done(timeout_sec=360)
tasks = self.trogdor.tasks()

View File

@ -25,6 +25,8 @@ from kafkatest.services.zookeeper import ZookeeperService
class RoundTripFaultTest(Test):
topic_name_index = 0
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(RoundTripFaultTest, self).__init__(test_context)
@ -33,12 +35,15 @@ class RoundTripFaultTest(Test):
self.workload_service = RoundTripWorkloadService(test_context, self.kafka)
self.trogdor = TrogdorService(context=self.test_context,
client_services=[self.zk, self.kafka, self.workload_service])
topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index
RoundTripFaultTest.topic_name_index = RoundTripFaultTest.topic_name_index + 1
active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}}
self.round_trip_spec = RoundTripWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.client_node,
self.workload_service.bootstrap_servers,
target_messages_per_sec=10000,
partition_assignments={0: [0,1,2]},
max_messages=100000)
max_messages=100000,
active_topics=active_topics)
def setUp(self):
self.zk.start()

View File

@ -23,9 +23,11 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.trogdor.rest.Message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* Describes some partitions.
@ -44,8 +46,20 @@ public class PartitionsSpec extends Message {
@JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) {
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.partitionAssignments = partitionAssignments == null ?
new HashMap<Integer, List<Integer>>() : partitionAssignments;
HashMap<Integer, List<Integer>> partMap = new HashMap<>();
if (partitionAssignments != null) {
for (Entry<Integer, List<Integer>> entry : partitionAssignments.entrySet()) {
int partition = entry.getKey() == null ? 0 : entry.getKey();
ArrayList<Integer> assignments = new ArrayList<>();
if (entry.getValue() != null) {
for (Integer brokerId : entry.getValue()) {
assignments.add(brokerId == null ? Integer.valueOf(0) : brokerId);
}
}
partMap.put(partition, Collections.unmodifiableList(assignments));
}
}
this.partitionAssignments = Collections.unmodifiableMap(partMap);
}
@JsonProperty
@ -72,7 +86,7 @@ public class PartitionsSpec extends Message {
}
@JsonProperty
public Map<Integer, List<Integer>> partitionAssignmentsap() {
public Map<Integer, List<Integer>> partitionAssignments() {
return partitionAssignments;
}

View File

@ -33,6 +33,10 @@ import org.apache.kafka.trogdor.workload.TopicsSpec;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
@ -55,6 +59,11 @@ public class JsonSerializationTest {
0, null, null, 0));
verify(new TopicsSpec());
verify(new PartitionsSpec(0, (short) 0, null));
Map<Integer, List<Integer>> partitionAssignments = new HashMap<Integer, List<Integer>>();
partitionAssignments.put(0, Arrays.asList(1, 2, 3));
partitionAssignments.put(1, Arrays.asList(1, 2, 3));
verify(new PartitionsSpec(0, (short) 0, partitionAssignments));
verify(new PartitionsSpec(0, (short) 0, null));
}
private <T> void verify(T val1) throws Exception {