From 2a4ba75e1338eaa97da87077330b43d7448a18bc Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 5 Mar 2018 10:56:42 -0800 Subject: [PATCH] KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630) Author: Matthias J. Sax Reviewers: Bill Bejeck , John Roesler , Guozhang Wang --- .../apache/kafka/streams/StreamsConfig.java | 4 +- .../internals/StreamsPartitionAssignor.java | 226 ++++++++++------- .../internals/assignment/AssignmentInfo.java | 238 +++++++++++------- .../assignment/SubscriptionInfo.java | 220 +++++++++++----- .../QueryableStateIntegrationTest.java | 1 - .../StreamsPartitionAssignorTest.java | 95 ++++--- .../assignment/AssignmentInfoTest.java | 19 +- .../assignment/SubscriptionInfoTest.java | 10 +- 8 files changed, 527 insertions(+), 286 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6b3626101bd..47becfc239b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -226,11 +226,11 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface."; - /** {@code default timestamp.extractor} */ + /** {@code default.timestamp.extractor} */ public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; - /** {@code default value.serde} */ + /** {@code default.value.serde} */ public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface."; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 9aa0e94c8c1..71a84b2ca73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -66,7 +66,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable public final TaskId taskId; public final TopicPartition partition; - AssignedPartition(final TaskId taskId, final TopicPartition partition) { + AssignedPartition(final TaskId taskId, + final TopicPartition partition) { this.taskId = taskId; this.partition = partition; } @@ -77,11 +78,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (!(o instanceof AssignedPartition)) { return false; } - AssignedPartition other = (AssignedPartition) o; + final AssignedPartition other = (AssignedPartition) o; return compareTo(other) == 0; } @@ -104,8 +105,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final String host = getHost(endPoint); final Integer port = getPort(endPoint); - if (host == null || port == null) + if (host == null || port == null) { throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint)); + } hostInfo = new HostInfo(host, port); } else { @@ -119,10 +121,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable state = new ClientState(); } - void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { + void addConsumer(final String consumerMemberId, + final SubscriptionInfo info) { consumers.add(consumerMemberId); - state.addPreviousActiveTasks(info.prevTasks); - state.addPreviousStandbyTasks(info.standbyTasks); + state.addPreviousActiveTasks(info.prevTasks()); + state.addPreviousStandbyTasks(info.standbyTasks()); state.incrementCapacity(); } @@ -157,8 +160,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable private static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override - public int compare(TopicPartition p1, TopicPartition p2) { - int result = p1.topic().compareTo(p2.topic()); + public int compare(final TopicPartition p1, + final TopicPartition p2) { + final int result = p1.topic().compareTo(p2.topic()); if (result != 0) { return result; @@ -194,15 +198,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); if (o == null) { - KafkaException ex = new KafkaException("TaskManager is not specified"); - log.error(ex.getMessage(), ex); - throw ex; + final KafkaException fatalException = new KafkaException("TaskManager is not specified"); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } if (!(o instanceof TaskManager)) { - KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName())); - log.error(ex.getMessage(), ex); - throw ex; + final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName())); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } taskManager = (TaskManager) o; @@ -214,14 +218,14 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG); if (userEndPoint != null && !userEndPoint.isEmpty()) { try { - String host = getHost(userEndPoint); - Integer port = getPort(userEndPoint); + final String host = getHost(userEndPoint); + final Integer port = getPort(userEndPoint); if (host == null || port == null) throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" + " but received %s", logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s", logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG)); } @@ -240,7 +244,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } @Override - public Subscription subscription(Set topics) { + public Subscription subscription(final Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -249,7 +253,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Set previousActiveTasks = taskManager.prevActiveTaskIds(); final Set standbyTasks = taskManager.cachedTasksIds(); standbyTasks.removeAll(previousActiveTasks); - final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint); + final SubscriptionInfo data = new SubscriptionInfo( + taskManager.processId(), + previousActiveTasks, + standbyTasks, + this.userEndPoint); taskManager.updateSubscriptionsFromMetadata(topics); @@ -277,22 +285,32 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable * 3. within each client, tasks are assigned to consumer clients in round-robin manner. */ @Override - public Map assign(Cluster metadata, Map subscriptions) { + public Map assign(final Cluster metadata, + final Map subscriptions) { // construct the client metadata from the decoded subscription info - Map clientsMetadata = new HashMap<>(); + final Map clientsMetadata = new HashMap<>(); - for (Map.Entry entry : subscriptions.entrySet()) { - String consumerId = entry.getKey(); - Subscription subscription = entry.getValue(); + int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + for (final Map.Entry entry : subscriptions.entrySet()) { + final String consumerId = entry.getKey(); + final Subscription subscription = entry.getValue(); - SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final int usedVersion = info.version(); + if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + } + if (usedVersion < minUserMetadataVersion) { + minUserMetadataVersion = usedVersion; + } // create the new client metadata if necessary - ClientMetadata clientMetadata = clientsMetadata.get(info.processId); + ClientMetadata clientMetadata = clientsMetadata.get(info.processId()); if (clientMetadata == null) { - clientMetadata = new ClientMetadata(info.userEndPoint); - clientsMetadata.put(info.processId, clientMetadata); + clientMetadata = new ClientMetadata(info.userEndPoint()); + clientsMetadata.put(info.processId(), clientMetadata); } // add the consumer to the client @@ -309,8 +327,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Map topicGroups = taskManager.builder().topicGroups(); final Map repartitionTopicMetadata = new HashMap<>(); - for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { - for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic)); } } @@ -319,13 +337,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable do { numPartitionsNeeded = false; - for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { - for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions; // try set the number of partitions for this repartition topic if it is not set yet if (numPartitions == UNKNOWN) { - for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { + for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { final Set otherSinkTopics = otherTopicsInfo.sinkTopics; if (otherSinkTopics.contains(topicName)) { @@ -375,7 +393,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // augment the metadata with the newly computed number of partitions for all the // repartition source topics final Map allRepartitionTopicPartitions = new HashMap<>(); - for (Map.Entry entry : repartitionTopicMetadata.entrySet()) { + for (final Map.Entry entry : repartitionTopicMetadata.entrySet()) { final String topic = entry.getKey(); final int numPartitions = entry.getValue().numPartitions; @@ -395,7 +413,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // get the tasks as partition groups from the partition grouper final Set allSourceTopics = new HashSet<>(); final Map> sourceTopicsByGroup = new HashMap<>(); - for (Map.Entry entry : topicGroups.entrySet()) { + for (final Map.Entry entry : topicGroups.entrySet()) { allSourceTopics.addAll(entry.getValue().sourceTopics); sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics); } @@ -405,9 +423,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks final Set allAssignedPartitions = new HashSet<>(); final Map> tasksByTopicGroup = new HashMap<>(); - for (Map.Entry> entry : partitionsForTask.entrySet()) { + for (final Map.Entry> entry : partitionsForTask.entrySet()) { final Set partitions = entry.getValue(); - for (TopicPartition partition : partitions) { + for (final TopicPartition partition : partitions) { if (allAssignedPartitions.contains(partition)) { log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask); } @@ -422,10 +440,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } ids.add(id); } - for (String topic : allSourceTopics) { + for (final String topic : allSourceTopics) { final List partitionInfoList = fullMetadata.partitionsForTopic(topic); if (!partitionInfoList.isEmpty()) { - for (PartitionInfo partitionInfo : partitionInfoList) { + for (final PartitionInfo partitionInfo : partitionInfoList) { final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); if (!allAssignedPartitions.contains(partition)) { log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask); @@ -438,15 +456,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // add tasks to state change log topic subscribers final Map changelogTopicMetadata = new HashMap<>(); - for (Map.Entry entry : topicGroups.entrySet()) { + for (final Map.Entry entry : topicGroups.entrySet()) { final int topicGroupId = entry.getKey(); final Map stateChangelogTopics = entry.getValue().stateChangelogTopics; - for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) { + for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) { // the expected number of partitions is the max value of TaskId.partition + 1 int numPartitions = UNKNOWN; if (tasksByTopicGroup.get(topicGroupId) != null) { - for (TaskId task : tasksByTopicGroup.get(topicGroupId)) { + for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) { if (numPartitions < task.partition + 1) numPartitions = task.partition + 1; } @@ -468,7 +486,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // assign tasks to clients final Map states = new HashMap<>(); - for (Map.Entry entry : clientsMetadata.entrySet()) { + for (final Map.Entry entry : clientsMetadata.entrySet()) { states.put(entry.getKey(), entry.getValue().state); } @@ -484,25 +502,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); - for (Map.Entry entry : clientsMetadata.entrySet()) { - final HostInfo hostInfo = entry.getValue().hostInfo; + if (minUserMetadataVersion == 2) { + for (final Map.Entry entry : clientsMetadata.entrySet()) { + final HostInfo hostInfo = entry.getValue().hostInfo; - if (hostInfo != null) { - final Set topicPartitions = new HashSet<>(); - final ClientState state = entry.getValue().state; + if (hostInfo != null) { + final Set topicPartitions = new HashSet<>(); + final ClientState state = entry.getValue().state; - for (final TaskId id : state.activeTasks()) { - topicPartitions.addAll(partitionsForTask.get(id)); + for (final TaskId id : state.activeTasks()) { + topicPartitions.addAll(partitionsForTask.get(id)); + } + + partitionsByHostState.put(hostInfo, topicPartitions); } - - partitionsByHostState.put(hostInfo, topicPartitions); } } taskManager.setPartitionsByHostState(partitionsByHostState); // within the client, distribute tasks to its owned consumers final Map assignment = new HashMap<>(); - for (Map.Entry entry : clientsMetadata.entrySet()) { + for (final Map.Entry entry : clientsMetadata.entrySet()) { final Set consumers = entry.getValue().consumers; final ClientState state = entry.getValue().state; @@ -511,7 +531,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable int consumerTaskIndex = 0; - for (String consumer : consumers) { + for (final String consumer : consumers) { final Map> standby = new HashMap<>(); final ArrayList assignedPartitions = new ArrayList<>(); @@ -540,13 +560,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable Collections.sort(assignedPartitions); final List active = new ArrayList<>(); final List activePartitions = new ArrayList<>(); - for (AssignedPartition partition : assignedPartitions) { + for (final AssignedPartition partition : assignedPartitions) { active.add(partition.taskId); activePartitions.add(partition.partition); } // finally, encode the assignment before sending back to coordinator - assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode())); + assignment.put(consumer, new Assignment( + activePartitions, + new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode())); } } @@ -577,26 +599,54 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @Override - public void onAssignment(Assignment assignment) { - List partitions = new ArrayList<>(assignment.partitions()); + public void onAssignment(final Assignment assignment) { + final List partitions = new ArrayList<>(assignment.partitions()); Collections.sort(partitions, PARTITION_COMPARATOR); - AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final int usedVersion = info.version(); - Map> activeTasks = new HashMap<>(); + // version 1 field + final Map> activeTasks = new HashMap<>(); + // version 2 fields + final Map topicToPartitionInfo = new HashMap<>(); + final Map> partitionsByHost; + switch (usedVersion) { + case 1: + processVersionOneAssignment(info, partitions, activeTasks); + partitionsByHost = Collections.emptyMap(); + break; + case 2: + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); + } + + taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); + taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); + taskManager.updateSubscriptionsFromAssignment(partitions); + } + + private void processVersionOneAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks) { // the number of assigned partitions should be the same as number of active tasks, which // could be duplicated if one task has more than one assigned partitions - if (partitions.size() != info.activeTasks.size()) { + if (partitions.size() != info.activeTasks().size()) { throw new TaskAssignmentException( - String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" + - ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString()) + String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" + + ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString()) ); } for (int i = 0; i < partitions.size(); i++) { - TopicPartition partition = partitions.get(i); - TaskId id = info.activeTasks.get(i); + final TopicPartition partition = partitions.get(i); + final TaskId id = info.activeTasks().get(i); Set assignedPartitions = activeTasks.get(id); if (assignedPartitions == null) { @@ -605,23 +655,23 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } assignedPartitions.add(partition); } + } - final Map topicToPartitionInfo = new HashMap<>(); - for (Set value : info.partitionsByHost.values()) { - for (TopicPartition topicPartition : value) { - topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), - topicPartition.partition(), - null, - new Node[0], - new Node[0])); + private void processVersionTwoAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionOneAssignment(info, partitions, activeTasks); + + // process partitions by host + final Map> partitionsByHost = info.partitionsByHost(); + for (final Set value : partitionsByHost.values()) { + for (final TopicPartition topicPartition : value) { + topicToPartitionInfo.put( + topicPartition, + new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0])); } } - - taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); - taskManager.setPartitionsByHostState(info.partitionsByHost); - taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks); - - taskManager.updateSubscriptionsFromAssignment(partitions); } /** @@ -658,10 +708,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable log.debug("Completed validating internal topics in partition assignor."); } - private void ensureCopartitioning(Collection> copartitionGroups, - Map allRepartitionTopicsNumPartitions, - Cluster metadata) { - for (Set copartitionGroup : copartitionGroups) { + private void ensureCopartitioning(final Collection> copartitionGroups, + final Map allRepartitionTopicsNumPartitions, + final Cluster metadata) { + for (final Set copartitionGroup : copartitionGroups) { copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata); } } @@ -677,7 +727,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable private final Set updatedTopicSubscriptions = new HashSet<>(); - public void updateTopics(Collection topicNames) { + public void updateTopics(final Collection topicNames) { updatedTopicSubscriptions.clear(); updatedTopicSubscriptions.addAll(topicNames); } @@ -735,7 +785,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // if all topics for this co-partition group is repartition topics, // then set the number of partitions to be the maximum of the number of partitions. if (numPartitions == UNKNOWN) { - for (Map.Entry entry: allRepartitionTopicsNumPartitions.entrySet()) { + for (final Map.Entry entry: allRepartitionTopicsNumPartitions.entrySet()) { if (copartitionGroup.contains(entry.getKey())) { final int partitions = entry.getValue().numPartitions; if (partitions > numPartitions) { @@ -745,7 +795,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } } // enforce co-partitioning restrictions to repartition topics by updating their number of partitions - for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) { + for (final Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) { if (copartitionGroup.contains(entry.getKey())) { entry.getValue().numPartitions = numPartitions; } @@ -755,7 +805,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } // following functions are for test only - void setInternalTopicManager(InternalTopicManager internalTopicManager) { + void setInternalTopicManager(final InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 8607472c281..c8df7498755 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -39,76 +39,123 @@ import java.util.Set; public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - /** - * A new field was added, partitionsByHost. CURRENT_VERSION - * is required so we can decode the previous version. For example, this may occur - * during a rolling upgrade - */ - private static final int CURRENT_VERSION = 2; - public final int version; - public final List activeTasks; // each element corresponds to a partition - public final Map> standbyTasks; - public final Map> partitionsByHost; - public AssignmentInfo(List activeTasks, Map> standbyTasks, - Map> hostState) { - this(CURRENT_VERSION, activeTasks, standbyTasks, hostState); + public static final int LATEST_SUPPORTED_VERSION = 2; + + private final int usedVersion; + private List activeTasks; + private Map> standbyTasks; + private Map> partitionsByHost; + + private AssignmentInfo(final int version) { + this.usedVersion = version; } - protected AssignmentInfo(int version, List activeTasks, Map> standbyTasks, - Map> hostState) { - this.version = version; + public AssignmentInfo(final List activeTasks, + final Map> standbyTasks, + final Map> hostState) { + this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); + } + + public AssignmentInfo(final int version, + final List activeTasks, + final Map> standbyTasks, + final Map> hostState) { + this.usedVersion = version; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; } + public int version() { + return usedVersion; + } + + public List activeTasks() { + return activeTasks; + } + + public Map> standbyTasks() { + return standbyTasks; + } + + public Map> partitionsByHost() { + return partitionsByHost; + } + /** * @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an * IO exception during encoding */ public ByteBuffer encode() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - // Encode version - out.writeInt(version); - // Encode active tasks - out.writeInt(activeTasks.size()); - for (TaskId id : activeTasks) { - id.writeTo(out); - } - // Encode standby tasks - out.writeInt(standbyTasks.size()); - for (Map.Entry> entry : standbyTasks.entrySet()) { - TaskId id = entry.getKey(); - id.writeTo(out); - - Set partitions = entry.getValue(); - writeTopicPartitions(out, partitions); - } - out.writeInt(partitionsByHost.size()); - for (Map.Entry> entry : partitionsByHost - .entrySet()) { - final HostInfo hostInfo = entry.getKey(); - out.writeUTF(hostInfo.host()); - out.writeInt(hostInfo.port()); - writeTopicPartitions(out, entry.getValue()); + try (final DataOutputStream out = new DataOutputStream(baos)) { + switch (usedVersion) { + case 1: + encodeVersionOne(out); + break; + case 2: + encodeVersionTwo(out); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + LATEST_SUPPORTED_VERSION); } out.flush(); out.close(); return ByteBuffer.wrap(baos.toByteArray()); - } catch (IOException ex) { + } catch (final IOException ex) { throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); } } - private void writeTopicPartitions(DataOutputStream out, Set partitions) throws IOException { + private void encodeVersionOne(final DataOutputStream out) throws IOException { + out.writeInt(1); // version + encodeActiveAndStandbyTaskAssignment(out); + } + + private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException { + // encode active tasks + out.writeInt(activeTasks.size()); + for (final TaskId id : activeTasks) { + id.writeTo(out); + } + + // encode standby tasks + out.writeInt(standbyTasks.size()); + for (final Map.Entry> entry : standbyTasks.entrySet()) { + final TaskId id = entry.getKey(); + id.writeTo(out); + + final Set partitions = entry.getValue(); + writeTopicPartitions(out, partitions); + } + } + + private void encodeVersionTwo(final DataOutputStream out) throws IOException { + out.writeInt(2); // version + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHost(out); + } + + private void encodePartitionsByHost(final DataOutputStream out) throws IOException { + // encode partitions by host + out.writeInt(partitionsByHost.size()); + for (final Map.Entry> entry : partitionsByHost.entrySet()) { + final HostInfo hostInfo = entry.getKey(); + out.writeUTF(hostInfo.host()); + out.writeInt(hostInfo.port()); + writeTopicPartitions(out, entry.getValue()); + } + } + + private void writeTopicPartitions(final DataOutputStream out, + final Set partitions) throws IOException { out.writeInt(partitions.size()); - for (TopicPartition partition : partitions) { + for (final TopicPartition partition : partitions) { out.writeUTF(partition.topic()); out.writeInt(partition.partition()); } @@ -117,52 +164,69 @@ public class AssignmentInfo { /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ - public static AssignmentInfo decode(ByteBuffer data) { + public static AssignmentInfo decode(final ByteBuffer data) { // ensure we are at the beginning of the ByteBuffer data.rewind(); - try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { - // Decode version - int version = in.readInt(); - if (version < 0 || version > CURRENT_VERSION) { - TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version); - log.error(ex.getMessage(), ex); - throw ex; + try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { + // decode used version + final int usedVersion = in.readInt(); + final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion); + + switch (usedVersion) { + case 1: + decodeVersionOneData(assignmentInfo, in); + break; + case 2: + decodeVersionTwoData(assignmentInfo, in); + break; + default: + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } - // Decode active tasks - int count = in.readInt(); - List activeTasks = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - activeTasks.add(TaskId.readFrom(in)); - } - // Decode standby tasks - count = in.readInt(); - Map> standbyTasks = new HashMap<>(count); - for (int i = 0; i < count; i++) { - TaskId id = TaskId.readFrom(in); - standbyTasks.put(id, readTopicPartitions(in)); - } - - Map> hostStateToTopicPartitions = new HashMap<>(); - if (version == CURRENT_VERSION) { - int numEntries = in.readInt(); - for (int i = 0; i < numEntries; i++) { - HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); - hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in)); - } - } - - return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions); - - } catch (IOException ex) { + return assignmentInfo; + } catch (final IOException ex) { throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); } } - private static Set readTopicPartitions(DataInputStream in) throws IOException { - int numPartitions = in.readInt(); - Set partitions = new HashSet<>(numPartitions); + private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + // decode active tasks + int count = in.readInt(); + assignmentInfo.activeTasks = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + assignmentInfo.activeTasks.add(TaskId.readFrom(in)); + } + + // decode standby tasks + count = in.readInt(); + assignmentInfo.standbyTasks = new HashMap<>(count); + for (int i = 0; i < count; i++) { + TaskId id = TaskId.readFrom(in); + assignmentInfo.standbyTasks.put(id, readTopicPartitions(in)); + } + } + + private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeVersionOneData(assignmentInfo, in); + + // decode partitions by host + assignmentInfo.partitionsByHost = new HashMap<>(); + final int numEntries = in.readInt(); + for (int i = 0; i < numEntries; i++) { + final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); + assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in)); + } + } + + private static Set readTopicPartitions(final DataInputStream in) throws IOException { + final int numPartitions = in.readInt(); + final Set partitions = new HashSet<>(numPartitions); for (int j = 0; j < numPartitions; j++) { partitions.add(new TopicPartition(in.readUTF(), in.readInt())); } @@ -171,14 +235,14 @@ public class AssignmentInfo { @Override public int hashCode() { - return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); + return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (o instanceof AssignmentInfo) { - AssignmentInfo other = (AssignmentInfo) o; - return this.version == other.version && + final AssignmentInfo other = (AssignmentInfo) o; + return this.usedVersion == other.usedVersion && this.activeTasks.equals(other.activeTasks) && this.standbyTasks.equals(other.standbyTasks) && this.partitionsByHost.equals(other.partitionsByHost); @@ -189,7 +253,7 @@ public class AssignmentInfo { @Override public String toString() { - return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; + return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index f583dbafc94..7fee90b5402 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -31,42 +31,96 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - private static final int CURRENT_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 2; - public final int version; - public final UUID processId; - public final Set prevTasks; - public final Set standbyTasks; - public final String userEndPoint; + private final int usedVersion; + private UUID processId; + private Set prevTasks; + private Set standbyTasks; + private String userEndPoint; - public SubscriptionInfo(UUID processId, Set prevTasks, Set standbyTasks, String userEndPoint) { - this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + private SubscriptionInfo(final int version) { + this.usedVersion = version; } - private SubscriptionInfo(int version, UUID processId, Set prevTasks, Set standbyTasks, String userEndPoint) { - this.version = version; + public SubscriptionInfo(final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { + this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + } + + public SubscriptionInfo(final int version, + final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { + this.usedVersion = version; this.processId = processId; this.prevTasks = prevTasks; this.standbyTasks = standbyTasks; this.userEndPoint = userEndPoint; } + public int version() { + return usedVersion; + } + + public UUID processId() { + return processId; + } + + public Set prevTasks() { + return prevTasks; + } + + public Set standbyTasks() { + return standbyTasks; + } + + public String userEndPoint() { + return userEndPoint; + } + /** * @throws TaskAssignmentException if method fails to encode the data */ public ByteBuffer encode() { - byte[] endPointBytes; - if (userEndPoint == null) { - endPointBytes = new byte[0]; - } else { - endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buf; + + switch (usedVersion) { + case 1: + buf = encodeVersionOne(); + break; + case 2: + buf = encodeVersionTwo(prepareUserEndPoint()); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + LATEST_SUPPORTED_VERSION); } - ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + - prevTasks.size() * 8 + 4 + standbyTasks.size() * 8 - + 4 /* length of bytes */ + endPointBytes.length - ); - // version - buf.putInt(version); + + buf.rewind(); + return buf; + } + + private ByteBuffer encodeVersionOne() { + final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength()); + + buf.putInt(1); // version + encodeVersionOneData(buf); + + return buf; + } + + private int getVersionOneByteLength() { + return 4 + // version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8; // length + standby tasks + } + + private void encodeVersionOneData(final ByteBuffer buf) { // encode client UUID buf.putLong(processId.getMostSignificantBits()); buf.putLong(processId.getLeastSignificantBits()); @@ -80,60 +134,104 @@ public class SubscriptionInfo { for (TaskId id : standbyTasks) { id.writeTo(buf); } - buf.putInt(endPointBytes.length); - buf.put(endPointBytes); - buf.rewind(); + } + + private byte[] prepareUserEndPoint() { + if (userEndPoint == null) { + return new byte[0]; + } else { + return userEndPoint.getBytes(Charset.forName("UTF-8")); + } + } + + private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) { + final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes)); + + buf.putInt(2); // version + encodeVersionTwoData(buf, endPointBytes); + return buf; } + private int getVersionTwoByteLength(final byte[] endPointBytes) { + return getVersionOneByteLength() + + 4 + endPointBytes.length; // length + userEndPoint + } + + private void encodeVersionTwoData(final ByteBuffer buf, + final byte[] endPointBytes) { + encodeVersionOneData(buf); + if (endPointBytes != null) { + buf.putInt(endPointBytes.length); + buf.put(endPointBytes); + } + } + /** * @throws TaskAssignmentException if method fails to decode the data */ - public static SubscriptionInfo decode(ByteBuffer data) { + public static SubscriptionInfo decode(final ByteBuffer data) { // ensure we are at the beginning of the ByteBuffer data.rewind(); - // Decode version - int version = data.getInt(); - if (version == CURRENT_VERSION || version == 1) { - // Decode client UUID - UUID processId = new UUID(data.getLong(), data.getLong()); - // Decode previously active tasks - Set prevTasks = new HashSet<>(); - int numPrevs = data.getInt(); - for (int i = 0; i < numPrevs; i++) { - TaskId id = TaskId.readFrom(data); - prevTasks.add(id); - } - // Decode previously cached tasks - Set standbyTasks = new HashSet<>(); - int numCached = data.getInt(); - for (int i = 0; i < numCached; i++) { - standbyTasks.add(TaskId.readFrom(data)); - } + // decode used version + final int usedVersion = data.getInt(); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion); - String userEndPoint = null; - if (version == CURRENT_VERSION) { - int bytesLength = data.getInt(); - if (bytesLength != 0) { - byte[] bytes = new byte[bytesLength]; - data.get(bytes); - userEndPoint = new String(bytes, Charset.forName("UTF-8")); - } + switch (usedVersion) { + case 1: + decodeVersionOneData(subscriptionInfo, data); + break; + case 2: + decodeVersionTwoData(subscriptionInfo, data); + break; + default: + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; + } - } - return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint); + return subscriptionInfo; + } - } else { - TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version); - log.error(ex.getMessage(), ex); - throw ex; + private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + // decode client UUID + subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + + // decode previously active tasks + final int numPrevs = data.getInt(); + subscriptionInfo.prevTasks = new HashSet<>(); + for (int i = 0; i < numPrevs; i++) { + TaskId id = TaskId.readFrom(data); + subscriptionInfo.prevTasks.add(id); + } + + // decode previously cached tasks + final int numCached = data.getInt(); + subscriptionInfo.standbyTasks = new HashSet<>(); + for (int i = 0; i < numCached; i++) { + subscriptionInfo.standbyTasks.add(TaskId.readFrom(data)); + } + } + + private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + decodeVersionOneData(subscriptionInfo, data); + + // decode user end point (can be null) + int bytesLength = data.getInt(); + if (bytesLength != 0) { + final byte[] bytes = new byte[bytesLength]; + data.get(bytes); + subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8")); } } @Override public int hashCode() { - int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); + final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { return hashCode; } @@ -141,10 +239,10 @@ public class SubscriptionInfo { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (o instanceof SubscriptionInfo) { - SubscriptionInfo other = (SubscriptionInfo) o; - return this.version == other.version && + final SubscriptionInfo other = (SubscriptionInfo) o; + return this.usedVersion == other.usedVersion && this.processId.equals(other.processId) && this.prevTasks.equals(other.prevTasks) && this.standbyTasks.equals(other.standbyTasks) && diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 8b4e8957ed5..bb06c72d080 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -376,7 +376,6 @@ public class QueryableStateIntegrationTest { } } - @Test public void queryOnRebalance() throws InterruptedException { final int numThreads = STREAM_TWO_PARTITIONS; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index bf3f1d1ac5e..b0c0d68287b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -239,17 +239,17 @@ public class StreamsPartitionAssignorTest { // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); + allActiveTasks.addAll(info11.activeTasks()); assertEquals(Utils.mkSet(task0, task1), allActiveTasks); // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); + allActiveTasks.addAll(info20.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -317,13 +317,13 @@ public class StreamsPartitionAssignorTest { final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); final List expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3); - assertEquals(expectedInfo10TaskIds, info10.activeTasks); + assertEquals(expectedInfo10TaskIds, info10.activeTasks()); // the second consumer final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); final List expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2); - assertEquals(expectedInfo11TaskIds, info11.activeTasks); + assertEquals(expectedInfo11TaskIds, info11.activeTasks()); } @Test @@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest { // check assignment info Set allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -394,7 +394,7 @@ public class StreamsPartitionAssignorTest { // check assignment info Set allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(0, allActiveTasks.size()); assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks)); @@ -407,7 +407,7 @@ public class StreamsPartitionAssignorTest { // the first consumer info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -455,15 +455,15 @@ public class StreamsPartitionAssignorTest { AssignmentInfo info; info = AssignmentInfo.decode(assignments.get("consumer10").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer10").partitions()); info = AssignmentInfo.decode(assignments.get("consumer11").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer11").partitions()); info = AssignmentInfo.decode(assignments.get("consumer20").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer20").partitions()); assertEquals(allTasks, allActiveTasks); @@ -524,14 +524,14 @@ public class StreamsPartitionAssignorTest { AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); - assertEquals(2, info10.activeTasks.size()); - assertEquals(2, info11.activeTasks.size()); - assertEquals(2, info20.activeTasks.size()); + assertEquals(2, info10.activeTasks().size()); + assertEquals(2, info11.activeTasks().size()); + assertEquals(2, info20.activeTasks().size()); Set allTasks = new HashSet<>(); - allTasks.addAll(info10.activeTasks); - allTasks.addAll(info11.activeTasks); - allTasks.addAll(info20.activeTasks); + allTasks.addAll(info10.activeTasks()); + allTasks.addAll(info11.activeTasks()); + allTasks.addAll(info20.activeTasks()); assertEquals(new HashSet<>(tasks), allTasks); // check tasks for state topics @@ -603,15 +603,15 @@ public class StreamsPartitionAssignorTest { // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); - allStandbyTasks.addAll(info10.standbyTasks.keySet()); + allActiveTasks.addAll(info10.activeTasks()); + allStandbyTasks.addAll(info10.standbyTasks().keySet()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); - allStandbyTasks.addAll(info11.standbyTasks.keySet()); + allActiveTasks.addAll(info11.activeTasks()); + allStandbyTasks.addAll(info11.standbyTasks().keySet()); - assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet()); + assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet()); // check active tasks assigned to the first client assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); @@ -619,8 +619,8 @@ public class StreamsPartitionAssignorTest { // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); - allStandbyTasks.addAll(info20.standbyTasks.keySet()); + allActiveTasks.addAll(info20.activeTasks()); + allStandbyTasks.addAll(info20.standbyTasks().keySet()); // all task ids are in the active tasks and also in the standby tasks @@ -847,7 +847,7 @@ public class StreamsPartitionAssignorTest { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); - assertEquals("localhost:8080", subscriptionInfo.userEndPoint); + assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); } @Test @@ -874,7 +874,7 @@ public class StreamsPartitionAssignorTest { final Map assignments = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); - final Set topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080)); + final Set topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)), topicPartitions); @@ -1072,8 +1072,8 @@ public class StreamsPartitionAssignorTest { final Map assign = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); - final Set consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080)); - final Set consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090)); + final Set consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); + final Set consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); final HashSet allAssignedPartitions = new HashSet<>(consumer1partitions); allAssignedPartitions.addAll(consumer2Partitions); assertThat(consumer1partitions, not(allPartitions)); @@ -1095,6 +1095,37 @@ public class StreamsPartitionAssignorTest { partitionAssignor.configure(config); } + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception { + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "consumer2", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + + mockTaskManager(Collections.emptySet(), + Collections.emptySet(), + UUID.randomUUID(), + new InternalTopologyBuilder()); + partitionAssignor.configure(configProps()); + final Map assignment = partitionAssignor.assign(metadata, subscriptions); + + assertThat(assignment.size(), equalTo(2)); + assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1)); + assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1)); + } + private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.>emptyMap(), @@ -1111,7 +1142,7 @@ public class StreamsPartitionAssignorTest { AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); // check if the number of assigned partitions == the size of active task id list - assertEquals(assignment.partitions().size(), info.activeTasks.size()); + assertEquals(assignment.partitions().size(), info.activeTasks().size()); // check if active tasks are consistent List activeTasks = new ArrayList<>(); @@ -1121,14 +1152,14 @@ public class StreamsPartitionAssignorTest { activeTasks.add(new TaskId(0, partition.partition())); activeTopics.add(partition.topic()); } - assertEquals(activeTasks, info.activeTasks); + assertEquals(activeTasks, info.activeTasks()); // check if active partitions cover all topics assertEquals(expectedTopics, activeTopics); // check if standby tasks are consistent Set standbyTopics = new HashSet<>(); - for (Map.Entry> entry : info.standbyTasks.entrySet()) { + for (Map.Entry> entry : info.standbyTasks().entrySet()) { TaskId id = entry.getKey(); Set partitions = entry.getValue(); for (TopicPartition partition : partitions) { @@ -1139,7 +1170,7 @@ public class StreamsPartitionAssignorTest { } } - if (info.standbyTasks.size() > 0) { + if (info.standbyTasks().size() > 0) { // check if standby partitions cover all topics assertEquals(expectedTopics, standbyTopics); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index ec94ad81acd..726a5623cd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class AssignmentInfoTest { @@ -61,10 +62,10 @@ public class AssignmentInfoTest { standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null); final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion)); - assertEquals(oldVersion.activeTasks, decoded.activeTasks); - assertEquals(oldVersion.standbyTasks, decoded.standbyTasks); - assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1 - assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; + assertEquals(oldVersion.activeTasks(), decoded.activeTasks()); + assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks()); + assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1 + assertEquals(1, decoded.version()); } @@ -76,15 +77,15 @@ public class AssignmentInfoTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos); // Encode version - out.writeInt(oldVersion.version); + out.writeInt(oldVersion.version()); // Encode active tasks - out.writeInt(oldVersion.activeTasks.size()); - for (TaskId id : oldVersion.activeTasks) { + out.writeInt(oldVersion.activeTasks().size()); + for (TaskId id : oldVersion.activeTasks()) { id.writeTo(out); } // Encode standby tasks - out.writeInt(oldVersion.standbyTasks.size()); - for (Map.Entry> entry : oldVersion.standbyTasks.entrySet()) { + out.writeInt(oldVersion.standbyTasks().size()); + for (Map.Entry> entry : oldVersion.standbyTasks().entrySet()) { TaskId id = entry.getKey(); id.writeTo(out); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 9c011bb0cae..633285a2b4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -65,14 +65,12 @@ public class SubscriptionInfoTest { final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks); final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding); - assertEquals(activeTasks, decode.prevTasks); - assertEquals(standbyTasks, decode.standbyTasks); - assertEquals(processId, decode.processId); - assertNull(decode.userEndPoint); - + assertEquals(activeTasks, decode.prevTasks()); + assertEquals(standbyTasks, decode.standbyTasks()); + assertEquals(processId, decode.processId()); + assertNull(decode.userEndPoint()); } - /** * This is a clone of what the V1 encoding did. The encode method has changed for V2 * so it is impossible to test compatibility without having this