KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2018-03-05 10:56:42 -08:00 committed by GitHub
parent 604b93cfde
commit 2a4ba75e13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 527 additions and 286 deletions

View File

@ -226,11 +226,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
/** {@code default timestamp.extractor} */
/** {@code default.timestamp.extractor} */
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
/** {@code default value.serde} */
/** {@code default.value.serde} */
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";

View File

@ -66,7 +66,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
public final TaskId taskId;
public final TopicPartition partition;
AssignedPartition(final TaskId taskId, final TopicPartition partition) {
AssignedPartition(final TaskId taskId,
final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
}
@ -77,11 +78,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (!(o instanceof AssignedPartition)) {
return false;
}
AssignedPartition other = (AssignedPartition) o;
final AssignedPartition other = (AssignedPartition) o;
return compareTo(other) == 0;
}
@ -104,8 +105,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final String host = getHost(endPoint);
final Integer port = getPort(endPoint);
if (host == null || port == null)
if (host == null || port == null) {
throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
}
hostInfo = new HostInfo(host, port);
} else {
@ -119,10 +121,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
state = new ClientState();
}
void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
void addConsumer(final String consumerMemberId,
final SubscriptionInfo info) {
consumers.add(consumerMemberId);
state.addPreviousActiveTasks(info.prevTasks);
state.addPreviousStandbyTasks(info.standbyTasks);
state.addPreviousActiveTasks(info.prevTasks());
state.addPreviousStandbyTasks(info.standbyTasks());
state.incrementCapacity();
}
@ -157,8 +160,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@Override
public int compare(TopicPartition p1, TopicPartition p2) {
int result = p1.topic().compareTo(p2.topic());
public int compare(final TopicPartition p1,
final TopicPartition p2) {
final int result = p1.topic().compareTo(p2.topic());
if (result != 0) {
return result;
@ -194,15 +198,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
if (o == null) {
KafkaException ex = new KafkaException("TaskManager is not specified");
log.error(ex.getMessage(), ex);
throw ex;
final KafkaException fatalException = new KafkaException("TaskManager is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(o instanceof TaskManager)) {
KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
log.error(ex.getMessage(), ex);
throw ex;
final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
taskManager = (TaskManager) o;
@ -214,14 +218,14 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
try {
String host = getHost(userEndPoint);
Integer port = getPort(userEndPoint);
final String host = getHost(userEndPoint);
final Integer port = getPort(userEndPoint);
if (host == null || port == null)
throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
" but received %s",
logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
} catch (NumberFormatException nfe) {
} catch (final NumberFormatException nfe) {
throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
}
@ -240,7 +244,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
@Override
public Subscription subscription(Set<String> topics) {
public Subscription subscription(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
@ -249,7 +253,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
standbyTasks.removeAll(previousActiveTasks);
final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
final SubscriptionInfo data = new SubscriptionInfo(
taskManager.processId(),
previousActiveTasks,
standbyTasks,
this.userEndPoint);
taskManager.updateSubscriptionsFromMetadata(topics);
@ -277,22 +285,32 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* 3. within each client, tasks are assigned to consumer clients in round-robin manner.
*/
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
public Map<String, Assignment> assign(final Cluster metadata,
final Map<String, Subscription> subscriptions) {
// construct the client metadata from the decoded subscription info
Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
final String consumerId = entry.getKey();
final Subscription subscription = entry.getValue();
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
final int usedVersion = info.version();
if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION);
}
if (usedVersion < minUserMetadataVersion) {
minUserMetadataVersion = usedVersion;
}
// create the new client metadata if necessary
ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
ClientMetadata clientMetadata = clientsMetadata.get(info.processId());
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint);
clientsMetadata.put(info.processId, clientMetadata);
clientMetadata = new ClientMetadata(info.userEndPoint());
clientsMetadata.put(info.processId(), clientMetadata);
}
// add the consumer to the client
@ -309,8 +327,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
}
}
@ -319,13 +337,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
do {
numPartitionsNeeded = false;
for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
@ -375,7 +393,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// augment the metadata with the newly computed number of partitions for all the
// repartition source topics
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
final int numPartitions = entry.getValue().numPartitions;
@ -395,7 +413,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// get the tasks as partition groups from the partition grouper
final Set<String> allSourceTopics = new HashSet<>();
final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
allSourceTopics.addAll(entry.getValue().sourceTopics);
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
}
@ -405,9 +423,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
final Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
for (final TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
}
@ -422,10 +440,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
ids.add(id);
}
for (String topic : allSourceTopics) {
for (final String topic : allSourceTopics) {
final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
for (PartitionInfo partitionInfo : partitionInfoList) {
for (final PartitionInfo partitionInfo : partitionInfoList) {
final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
@ -438,15 +456,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// add tasks to state change log topic subscribers
final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = UNKNOWN;
if (tasksByTopicGroup.get(topicGroupId) != null) {
for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
if (numPartitions < task.partition + 1)
numPartitions = task.partition + 1;
}
@ -468,7 +486,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// assign tasks to clients
final Map<UUID, ClientState> states = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
states.put(entry.getKey(), entry.getValue().state);
}
@ -484,25 +502,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
if (minUserMetadataVersion == 2) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
final ClientState state = entry.getValue().state;
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
final ClientState state = entry.getValue().state;
for (final TaskId id : state.activeTasks()) {
topicPartitions.addAll(partitionsForTask.get(id));
for (final TaskId id : state.activeTasks()) {
topicPartitions.addAll(partitionsForTask.get(id));
}
partitionsByHostState.put(hostInfo, topicPartitions);
}
partitionsByHostState.put(hostInfo, topicPartitions);
}
}
taskManager.setPartitionsByHostState(partitionsByHostState);
// within the client, distribute tasks to its owned consumers
final Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final Set<String> consumers = entry.getValue().consumers;
final ClientState state = entry.getValue().state;
@ -511,7 +531,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
int consumerTaskIndex = 0;
for (String consumer : consumers) {
for (final String consumer : consumers) {
final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
@ -540,13 +560,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
Collections.sort(assignedPartitions);
final List<TaskId> active = new ArrayList<>();
final List<TopicPartition> activePartitions = new ArrayList<>();
for (AssignedPartition partition : assignedPartitions) {
for (final AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
}
// finally, encode the assignment before sending back to coordinator
assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
assignment.put(consumer, new Assignment(
activePartitions,
new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
}
}
@ -577,26 +599,54 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
public void onAssignment(Assignment assignment) {
List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
public void onAssignment(final Assignment assignment) {
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
Collections.sort(partitions, PARTITION_COMPARATOR);
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
final int usedVersion = info.version();
Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
// version 2 fields
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
switch (usedVersion) {
case 1:
processVersionOneAssignment(info, partitions, activeTasks);
partitionsByHost = Collections.emptyMap();
break;
case 2:
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
}
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
taskManager.setPartitionsByHostState(partitionsByHost);
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
taskManager.updateSubscriptionsFromAssignment(partitions);
}
private void processVersionOneAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks) {
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks.size()) {
if (partitions.size() != info.activeTasks().size()) {
throw new TaskAssignmentException(
String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString())
);
}
for (int i = 0; i < partitions.size(); i++) {
TopicPartition partition = partitions.get(i);
TaskId id = info.activeTasks.get(i);
final TopicPartition partition = partitions.get(i);
final TaskId id = info.activeTasks().get(i);
Set<TopicPartition> assignedPartitions = activeTasks.get(id);
if (assignedPartitions == null) {
@ -605,23 +655,23 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
assignedPartitions.add(partition);
}
}
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
for (Set<TopicPartition> value : info.partitionsByHost.values()) {
for (TopicPartition topicPartition : value) {
topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
topicPartition.partition(),
null,
new Node[0],
new Node[0]));
private void processVersionTwoAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionOneAssignment(info, partitions, activeTasks);
// process partitions by host
final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
for (final Set<TopicPartition> value : partitionsByHost.values()) {
for (final TopicPartition topicPartition : value) {
topicToPartitionInfo.put(
topicPartition,
new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
}
}
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
taskManager.setPartitionsByHostState(info.partitionsByHost);
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
taskManager.updateSubscriptionsFromAssignment(partitions);
}
/**
@ -658,10 +708,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
log.debug("Completed validating internal topics in partition assignor.");
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
Cluster metadata) {
for (Set<String> copartitionGroup : copartitionGroups) {
private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {
for (final Set<String> copartitionGroup : copartitionGroups) {
copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
}
}
@ -677,7 +727,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
private final Set<String> updatedTopicSubscriptions = new HashSet<>();
public void updateTopics(Collection<String> topicNames) {
public void updateTopics(final Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
}
@ -735,7 +785,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == UNKNOWN) {
for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
final int partitions = entry.getValue().numPartitions;
if (partitions > numPartitions) {
@ -745,7 +795,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
}
// enforce co-partitioning restrictions to repartition topics by updating their number of partitions
for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
entry.getValue().numPartitions = numPartitions;
}
@ -755,7 +805,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
// following functions are for test only
void setInternalTopicManager(InternalTopicManager internalTopicManager) {
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
}

View File

@ -39,76 +39,123 @@ import java.util.Set;
public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
/**
* A new field was added, partitionsByHost. CURRENT_VERSION
* is required so we can decode the previous version. For example, this may occur
* during a rolling upgrade
*/
private static final int CURRENT_VERSION = 2;
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
public static final int LATEST_SUPPORTED_VERSION = 2;
private final int usedVersion;
private List<TaskId> activeTasks;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
private AssignmentInfo(final int version) {
this.usedVersion = version;
}
protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
public AssignmentInfo(final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
}
public AssignmentInfo(final int version,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
this.usedVersion = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.partitionsByHost = hostState;
}
public int version() {
return usedVersion;
}
public List<TaskId> activeTasks() {
return activeTasks;
}
public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}
public Map<HostInfo, Set<TopicPartition>> partitionsByHost() {
return partitionsByHost;
}
/**
* @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
* IO exception during encoding
*/
public ByteBuffer encode() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
// Encode version
out.writeInt(version);
// Encode active tasks
out.writeInt(activeTasks.size());
for (TaskId id : activeTasks) {
id.writeTo(out);
}
// Encode standby tasks
out.writeInt(standbyTasks.size());
for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
TaskId id = entry.getKey();
id.writeTo(out);
Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
out.writeInt(partitionsByHost.size());
for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
writeTopicPartitions(out, entry.getValue());
try (final DataOutputStream out = new DataOutputStream(baos)) {
switch (usedVersion) {
case 1:
encodeVersionOne(out);
break;
case 2:
encodeVersionTwo(out);
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}
out.flush();
out.close();
return ByteBuffer.wrap(baos.toByteArray());
} catch (IOException ex) {
} catch (final IOException ex) {
throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}
private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
private void encodeVersionOne(final DataOutputStream out) throws IOException {
out.writeInt(1); // version
encodeActiveAndStandbyTaskAssignment(out);
}
private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException {
// encode active tasks
out.writeInt(activeTasks.size());
for (final TaskId id : activeTasks) {
id.writeTo(out);
}
// encode standby tasks
out.writeInt(standbyTasks.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
final TaskId id = entry.getKey();
id.writeTo(out);
final Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
}
private void encodeVersionTwo(final DataOutputStream out) throws IOException {
out.writeInt(2); // version
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
}
private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
// encode partitions by host
out.writeInt(partitionsByHost.size());
for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
writeTopicPartitions(out, entry.getValue());
}
}
private void writeTopicPartitions(final DataOutputStream out,
final Set<TopicPartition> partitions) throws IOException {
out.writeInt(partitions.size());
for (TopicPartition partition : partitions) {
for (final TopicPartition partition : partitions) {
out.writeUTF(partition.topic());
out.writeInt(partition.partition());
}
@ -117,52 +164,69 @@ public class AssignmentInfo {
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
public static AssignmentInfo decode(ByteBuffer data) {
public static AssignmentInfo decode(final ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
// Decode version
int version = in.readInt();
if (version < 0 || version > CURRENT_VERSION) {
TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
// decode used version
final int usedVersion = in.readInt();
final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);
switch (usedVersion) {
case 1:
decodeVersionOneData(assignmentInfo, in);
break;
case 2:
decodeVersionTwoData(assignmentInfo, in);
break;
default:
TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
"used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
// Decode active tasks
int count = in.readInt();
List<TaskId> activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
activeTasks.add(TaskId.readFrom(in));
}
// Decode standby tasks
count = in.readInt();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
standbyTasks.put(id, readTopicPartitions(in));
}
Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
if (version == CURRENT_VERSION) {
int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
}
}
return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
} catch (IOException ex) {
return assignmentInfo;
} catch (final IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
}
private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
int numPartitions = in.readInt();
Set<TopicPartition> partitions = new HashSet<>(numPartitions);
private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
// decode active tasks
int count = in.readInt();
assignmentInfo.activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
assignmentInfo.activeTasks.add(TaskId.readFrom(in));
}
// decode standby tasks
count = in.readInt();
assignmentInfo.standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
}
}
private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
decodeVersionOneData(assignmentInfo, in);
// decode partitions by host
assignmentInfo.partitionsByHost = new HashMap<>();
final int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in));
}
}
private static Set<TopicPartition> readTopicPartitions(final DataInputStream in) throws IOException {
final int numPartitions = in.readInt();
final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
for (int j = 0; j < numPartitions; j++) {
partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
}
@ -171,14 +235,14 @@ public class AssignmentInfo {
@Override
public int hashCode() {
return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (o instanceof AssignmentInfo) {
AssignmentInfo other = (AssignmentInfo) o;
return this.version == other.version &&
final AssignmentInfo other = (AssignmentInfo) o;
return this.usedVersion == other.usedVersion &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
this.partitionsByHost.equals(other.partitionsByHost);
@ -189,7 +253,7 @@ public class AssignmentInfo {
@Override
public String toString() {
return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
}
}

View File

@ -31,42 +31,96 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
private static final int CURRENT_VERSION = 2;
public static final int LATEST_SUPPORTED_VERSION = 2;
public final int version;
public final UUID processId;
public final Set<TaskId> prevTasks;
public final Set<TaskId> standbyTasks;
public final String userEndPoint;
private final int usedVersion;
private UUID processId;
private Set<TaskId> prevTasks;
private Set<TaskId> standbyTasks;
private String userEndPoint;
public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
private SubscriptionInfo(final int version) {
this.usedVersion = version;
}
private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
public SubscriptionInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
public SubscriptionInfo(final int version,
final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
this.usedVersion = version;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
this.userEndPoint = userEndPoint;
}
public int version() {
return usedVersion;
}
public UUID processId() {
return processId;
}
public Set<TaskId> prevTasks() {
return prevTasks;
}
public Set<TaskId> standbyTasks() {
return standbyTasks;
}
public String userEndPoint() {
return userEndPoint;
}
/**
* @throws TaskAssignmentException if method fails to encode the data
*/
public ByteBuffer encode() {
byte[] endPointBytes;
if (userEndPoint == null) {
endPointBytes = new byte[0];
} else {
endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
final ByteBuffer buf;
switch (usedVersion) {
case 1:
buf = encodeVersionOne();
break;
case 2:
buf = encodeVersionTwo(prepareUserEndPoint());
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}
ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
+ 4 /* length of bytes */ + endPointBytes.length
);
// version
buf.putInt(version);
buf.rewind();
return buf;
}
private ByteBuffer encodeVersionOne() {
final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
buf.putInt(1); // version
encodeVersionOneData(buf);
return buf;
}
private int getVersionOneByteLength() {
return 4 + // version
16 + // client ID
4 + prevTasks.size() * 8 + // length + prev tasks
4 + standbyTasks.size() * 8; // length + standby tasks
}
private void encodeVersionOneData(final ByteBuffer buf) {
// encode client UUID
buf.putLong(processId.getMostSignificantBits());
buf.putLong(processId.getLeastSignificantBits());
@ -80,60 +134,104 @@ public class SubscriptionInfo {
for (TaskId id : standbyTasks) {
id.writeTo(buf);
}
buf.putInt(endPointBytes.length);
buf.put(endPointBytes);
buf.rewind();
}
private byte[] prepareUserEndPoint() {
if (userEndPoint == null) {
return new byte[0];
} else {
return userEndPoint.getBytes(Charset.forName("UTF-8"));
}
}
private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
buf.putInt(2); // version
encodeVersionTwoData(buf, endPointBytes);
return buf;
}
private int getVersionTwoByteLength(final byte[] endPointBytes) {
return getVersionOneByteLength() +
4 + endPointBytes.length; // length + userEndPoint
}
private void encodeVersionTwoData(final ByteBuffer buf,
final byte[] endPointBytes) {
encodeVersionOneData(buf);
if (endPointBytes != null) {
buf.putInt(endPointBytes.length);
buf.put(endPointBytes);
}
}
/**
* @throws TaskAssignmentException if method fails to decode the data
*/
public static SubscriptionInfo decode(ByteBuffer data) {
public static SubscriptionInfo decode(final ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
// Decode version
int version = data.getInt();
if (version == CURRENT_VERSION || version == 1) {
// Decode client UUID
UUID processId = new UUID(data.getLong(), data.getLong());
// Decode previously active tasks
Set<TaskId> prevTasks = new HashSet<>();
int numPrevs = data.getInt();
for (int i = 0; i < numPrevs; i++) {
TaskId id = TaskId.readFrom(data);
prevTasks.add(id);
}
// Decode previously cached tasks
Set<TaskId> standbyTasks = new HashSet<>();
int numCached = data.getInt();
for (int i = 0; i < numCached; i++) {
standbyTasks.add(TaskId.readFrom(data));
}
// decode used version
final int usedVersion = data.getInt();
final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion);
String userEndPoint = null;
if (version == CURRENT_VERSION) {
int bytesLength = data.getInt();
if (bytesLength != 0) {
byte[] bytes = new byte[bytesLength];
data.get(bytes);
userEndPoint = new String(bytes, Charset.forName("UTF-8"));
}
switch (usedVersion) {
case 1:
decodeVersionOneData(subscriptionInfo, data);
break;
case 2:
decodeVersionTwoData(subscriptionInfo, data);
break;
default:
TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
"used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
}
return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
return subscriptionInfo;
}
} else {
TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
log.error(ex.getMessage(), ex);
throw ex;
private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
// decode client UUID
subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
// decode previously active tasks
final int numPrevs = data.getInt();
subscriptionInfo.prevTasks = new HashSet<>();
for (int i = 0; i < numPrevs; i++) {
TaskId id = TaskId.readFrom(data);
subscriptionInfo.prevTasks.add(id);
}
// decode previously cached tasks
final int numCached = data.getInt();
subscriptionInfo.standbyTasks = new HashSet<>();
for (int i = 0; i < numCached; i++) {
subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
}
}
private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
decodeVersionOneData(subscriptionInfo, data);
// decode user end point (can be null)
int bytesLength = data.getInt();
if (bytesLength != 0) {
final byte[] bytes = new byte[bytesLength];
data.get(bytes);
subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8"));
}
}
@Override
public int hashCode() {
int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
if (userEndPoint == null) {
return hashCode;
}
@ -141,10 +239,10 @@ public class SubscriptionInfo {
}
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (o instanceof SubscriptionInfo) {
SubscriptionInfo other = (SubscriptionInfo) o;
return this.version == other.version &&
final SubscriptionInfo other = (SubscriptionInfo) o;
return this.usedVersion == other.usedVersion &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&

View File

@ -376,7 +376,6 @@ public class QueryableStateIntegrationTest {
}
}
@Test
public void queryOnRebalance() throws InterruptedException {
final int numThreads = STREAM_TWO_PARTITIONS;

View File

@ -239,17 +239,17 @@ public class StreamsPartitionAssignorTest {
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allActiveTasks.addAll(info10.activeTasks());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks);
allActiveTasks.addAll(info11.activeTasks());
assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
// the third consumer
AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
allActiveTasks.addAll(info20.activeTasks);
allActiveTasks.addAll(info20.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@ -317,13 +317,13 @@ public class StreamsPartitionAssignorTest {
final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
assertEquals(expectedInfo10TaskIds, info10.activeTasks);
assertEquals(expectedInfo10TaskIds, info10.activeTasks());
// the second consumer
final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
assertEquals(expectedInfo11TaskIds, info11.activeTasks);
assertEquals(expectedInfo11TaskIds, info11.activeTasks());
}
@Test
@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest {
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allActiveTasks.addAll(info10.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@ -394,7 +394,7 @@ public class StreamsPartitionAssignorTest {
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allActiveTasks.addAll(info10.activeTasks());
assertEquals(0, allActiveTasks.size());
assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@ -407,7 +407,7 @@ public class StreamsPartitionAssignorTest {
// the first consumer
info10 = checkAssignment(allTopics, assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allActiveTasks.addAll(info10.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@ -455,15 +455,15 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info;
info = AssignmentInfo.decode(assignments.get("consumer10").userData());
allActiveTasks.addAll(info.activeTasks);
allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
allActiveTasks.addAll(info.activeTasks);
allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer11").partitions());
info = AssignmentInfo.decode(assignments.get("consumer20").userData());
allActiveTasks.addAll(info.activeTasks);
allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer20").partitions());
assertEquals(allTasks, allActiveTasks);
@ -524,14 +524,14 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
assertEquals(2, info10.activeTasks.size());
assertEquals(2, info11.activeTasks.size());
assertEquals(2, info20.activeTasks.size());
assertEquals(2, info10.activeTasks().size());
assertEquals(2, info11.activeTasks().size());
assertEquals(2, info20.activeTasks().size());
Set<TaskId> allTasks = new HashSet<>();
allTasks.addAll(info10.activeTasks);
allTasks.addAll(info11.activeTasks);
allTasks.addAll(info20.activeTasks);
allTasks.addAll(info10.activeTasks());
allTasks.addAll(info11.activeTasks());
allTasks.addAll(info20.activeTasks());
assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
@ -603,15 +603,15 @@ public class StreamsPartitionAssignorTest {
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allStandbyTasks.addAll(info10.standbyTasks.keySet());
allActiveTasks.addAll(info10.activeTasks());
allStandbyTasks.addAll(info10.standbyTasks().keySet());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks);
allStandbyTasks.addAll(info11.standbyTasks.keySet());
allActiveTasks.addAll(info11.activeTasks());
allStandbyTasks.addAll(info11.standbyTasks().keySet());
assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
@ -619,8 +619,8 @@ public class StreamsPartitionAssignorTest {
// the third consumer
AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
allActiveTasks.addAll(info20.activeTasks);
allStandbyTasks.addAll(info20.standbyTasks.keySet());
allActiveTasks.addAll(info20.activeTasks());
allStandbyTasks.addAll(info20.standbyTasks().keySet());
// all task ids are in the active tasks and also in the standby tasks
@ -847,7 +847,7 @@ public class StreamsPartitionAssignorTest {
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
}
@Test
@ -874,7 +874,7 @@ public class StreamsPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic1", 2)), topicPartitions);
@ -1072,8 +1072,8 @@ public class StreamsPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
allAssignedPartitions.addAll(consumer2Partitions);
assertThat(consumer1partitions, not(allPartitions));
@ -1095,6 +1095,37 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(config);
}
@Test
public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception {
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
final Set<TaskId> emptyTasks = Collections.emptySet();
subscriptions.put(
"consumer1",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
subscriptions.put(
"consumer2",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
mockTaskManager(Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(),
UUID.randomUUID(),
new InternalTopologyBuilder());
partitionAssignor.configure(configProps());
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
assertThat(assignment.size(), equalTo(2));
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1));
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
}
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
Collections.<TaskId, Set<TopicPartition>>emptyMap(),
@ -1111,7 +1142,7 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
// check if the number of assigned partitions == the size of active task id list
assertEquals(assignment.partitions().size(), info.activeTasks.size());
assertEquals(assignment.partitions().size(), info.activeTasks().size());
// check if active tasks are consistent
List<TaskId> activeTasks = new ArrayList<>();
@ -1121,14 +1152,14 @@ public class StreamsPartitionAssignorTest {
activeTasks.add(new TaskId(0, partition.partition()));
activeTopics.add(partition.topic());
}
assertEquals(activeTasks, info.activeTasks);
assertEquals(activeTasks, info.activeTasks());
// check if active partitions cover all topics
assertEquals(expectedTopics, activeTopics);
// check if standby tasks are consistent
Set<String> standbyTopics = new HashSet<>();
for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) {
TaskId id = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
@ -1139,7 +1170,7 @@ public class StreamsPartitionAssignorTest {
}
}
if (info.standbyTasks.size() > 0) {
if (info.standbyTasks().size() > 0) {
// check if standby partitions cover all topics
assertEquals(expectedTopics, standbyTopics);
}

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class AssignmentInfoTest {
@ -61,10 +62,10 @@ public class AssignmentInfoTest {
standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1
assertEquals(1, decoded.version());
}
@ -76,15 +77,15 @@ public class AssignmentInfoTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
// Encode version
out.writeInt(oldVersion.version);
out.writeInt(oldVersion.version());
// Encode active tasks
out.writeInt(oldVersion.activeTasks.size());
for (TaskId id : oldVersion.activeTasks) {
out.writeInt(oldVersion.activeTasks().size());
for (TaskId id : oldVersion.activeTasks()) {
id.writeTo(out);
}
// Encode standby tasks
out.writeInt(oldVersion.standbyTasks.size());
for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
out.writeInt(oldVersion.standbyTasks().size());
for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) {
TaskId id = entry.getKey();
id.writeTo(out);

View File

@ -65,14 +65,12 @@ public class SubscriptionInfoTest {
final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
assertEquals(activeTasks, decode.prevTasks);
assertEquals(standbyTasks, decode.standbyTasks);
assertEquals(processId, decode.processId);
assertNull(decode.userEndPoint);
assertEquals(activeTasks, decode.prevTasks());
assertEquals(standbyTasks, decode.standbyTasks());
assertEquals(processId, decode.processId());
assertNull(decode.userEndPoint());
}
/**
* This is a clone of what the V1 encoding did. The encode method has changed for V2
* so it is impossible to test compatibility without having this