MINOR: Use the automated protocol for the Consumer Protocol's subscriptions and assignments (#8897)

This PR moves the consumer protocol to using the automated protocol instead of using plain old structs.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2020-09-25 18:21:22 +02:00 committed by GitHub
parent ac8acec653
commit 466f8fd21c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 225 additions and 290 deletions

View File

@ -16,16 +16,16 @@
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Collections;
import java.nio.BufferUnderflowException;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
@ -35,287 +35,154 @@ import java.util.Map;
/**
* ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
* Kafka's generalized group management protocol. Below is the version 1 format:
*
* <pre>
* Subscription => Version Topics
* Version => Int16
* Topics => [String]
* UserData => Bytes
* OwnedPartitions => [Topic Partitions]
* Topic => String
* Partitions => [int32]
*
* Assignment => Version TopicPartitions
* Version => int16
* AssignedPartitions => [Topic Partitions]
* Topic => String
* Partitions => [int32]
* UserData => Bytes
* </pre>
*
* Version 0 format:
*
* <pre>
* Subscription => Version Topics
* Version => Int16
* Topics => [String]
* UserData => Bytes
*
* Assignment => Version TopicPartitions
* Version => int16
* AssignedPartitions => [Topic Partitions]
* Topic => String
* Partitions => [int32]
* UserData => Bytes
* </pre>
*
* Kafka's generalized group management protocol.
*
* The current implementation assumes that future versions will not break compatibility. When
* it encounters a newer version, it parses it using the current format. This basically means
* that new versions cannot remove or reorder any of the existing fields.
*/
public class ConsumerProtocol {
public static final String PROTOCOL_TYPE = "consumer";
public static final String VERSION_KEY_NAME = "version";
public static final String TOPICS_KEY_NAME = "topics";
public static final String TOPIC_KEY_NAME = "topic";
public static final String PARTITIONS_KEY_NAME = "partitions";
public static final String OWNED_PARTITIONS_KEY_NAME = "owned_partitions";
public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
public static final String USER_DATA_KEY_NAME = "user_data";
static {
// Safety check to ensure that both parts of the consumer protocol remain in sync.
if (ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION
!= ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION)
throw new IllegalStateException("Subscription and Assignment schemas must have the " +
"same lowest version");
public static final short CONSUMER_PROTOCOL_V0 = 0;
public static final short CONSUMER_PROTOCOL_V1 = 1;
public static final short CONSUMER_PROTOCOL_LATEST_VERSION = CONSUMER_PROTOCOL_V1;
public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
new Field(VERSION_KEY_NAME, Type.INT16));
private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
.set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0);
private static final Struct CONSUMER_PROTOCOL_HEADER_V1 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
.set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V1);
public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
new Field(TOPIC_KEY_NAME, Type.STRING),
new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
public static final Schema SUBSCRIPTION_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
public static final Schema SUBSCRIPTION_V1 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
new Field(OWNED_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)));
public static final Schema ASSIGNMENT_V0 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
public static final Schema ASSIGNMENT_V1 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
public static Short deserializeVersion(ByteBuffer buffer) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
return header.getShort(VERSION_KEY_NAME);
if (ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION
!= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION)
throw new IllegalStateException("Subscription and Assignment schemas must have the " +
"same highest version");
}
public static ByteBuffer serializeSubscriptionV0(Subscription subscription) {
Struct struct = new Struct(SUBSCRIPTION_V0);
struct.set(USER_DATA_KEY_NAME, subscription.userData());
struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + SUBSCRIPTION_V0.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer);
SUBSCRIPTION_V0.write(buffer, struct);
buffer.flip();
return buffer;
public static short deserializeVersion(final ByteBuffer buffer) {
try {
return buffer.getShort();
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing consumer protocol's header", e);
}
}
public static ByteBuffer serializeSubscriptionV1(Subscription subscription) {
Struct struct = new Struct(SUBSCRIPTION_V1);
struct.set(USER_DATA_KEY_NAME, subscription.userData());
struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
List<Struct> topicAssignments = new ArrayList<>();
public static ByteBuffer serializeSubscription(final Subscription subscription) {
return serializeSubscription(subscription, ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
}
public static ByteBuffer serializeSubscription(final Subscription subscription, short version) {
version = checkSubscriptionVersion(version);
ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
data.setTopics(subscription.topics());
data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null);
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
topicAssignments.add(topicAssignment);
data.ownedPartitions().add(new ConsumerProtocolSubscription.TopicPartition()
.setTopic(topicEntry.getKey())
.setPartitions(topicEntry.getValue()));
}
struct.set(OWNED_PARTITIONS_KEY_NAME, topicAssignments.toArray());
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + SUBSCRIPTION_V1.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
SUBSCRIPTION_V1.write(buffer, struct);
buffer.flip();
return buffer;
return serializeMessage(version, data);
}
public static ByteBuffer serializeSubscription(Subscription subscription) {
return serializeSubscription(subscription, CONSUMER_PROTOCOL_LATEST_VERSION);
}
public static Subscription deserializeSubscription(final ByteBuffer buffer, short version) {
version = checkSubscriptionVersion(version);
public static ByteBuffer serializeSubscription(Subscription subscription, short version) {
switch (version) {
case CONSUMER_PROTOCOL_V0:
return serializeSubscriptionV0(subscription);
try {
ConsumerProtocolSubscription data =
new ConsumerProtocolSubscription(new ByteBufferAccessor(buffer), version);
case CONSUMER_PROTOCOL_V1:
return serializeSubscriptionV1(subscription);
default:
// for any versions higher than known, try to serialize it as V1
return serializeSubscriptionV1(subscription);
}
}
public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
return new Subscription(topics, userData, Collections.emptyList());
}
public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
List<TopicPartition> ownedPartitions = new ArrayList<>();
for (Object structObj : struct.getArray(OWNED_PARTITIONS_KEY_NAME)) {
Struct assignment = (Struct) structObj;
String topic = assignment.getString(TOPIC_KEY_NAME);
for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
ownedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
List<TopicPartition> ownedPartitions = new ArrayList<>();
for (ConsumerProtocolSubscription.TopicPartition tp : data.ownedPartitions()) {
for (Integer partition : tp.partitions()) {
ownedPartitions.add(new TopicPartition(tp.topic(), partition));
}
}
}
return new Subscription(topics, userData, ownedPartitions);
return new Subscription(
data.topics(),
data.userData() != null ? data.userData().duplicate() : null,
ownedPartitions);
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing consumer protocol's subscription", e);
}
}
public static Subscription deserializeSubscription(ByteBuffer buffer) {
Short version = deserializeVersion(buffer);
public static Subscription deserializeSubscription(final ByteBuffer buffer) {
return deserializeSubscription(buffer, deserializeVersion(buffer));
}
if (version < CONSUMER_PROTOCOL_V0)
public static ByteBuffer serializeAssignment(final Assignment assignment) {
return serializeAssignment(assignment, ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
}
public static ByteBuffer serializeAssignment(final Assignment assignment, short version) {
version = checkAssignmentVersion(version);
ConsumerProtocolAssignment data = new ConsumerProtocolAssignment();
data.setUserData(assignment.userData() != null ? assignment.userData().duplicate() : null);
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
data.assignedPartitions().add(new ConsumerProtocolAssignment.TopicPartition()
.setTopic(topicEntry.getKey())
.setPartitions(topicEntry.getValue()));
}
return serializeMessage(version, data);
}
public static Assignment deserializeAssignment(final ByteBuffer buffer, short version) {
version = checkAssignmentVersion(version);
try {
ConsumerProtocolAssignment data =
new ConsumerProtocolAssignment(new ByteBufferAccessor(buffer), version);
List<TopicPartition> assignedPartitions = new ArrayList<>();
for (ConsumerProtocolAssignment.TopicPartition tp : data.assignedPartitions()) {
for (Integer partition : tp.partitions()) {
assignedPartitions.add(new TopicPartition(tp.topic(), partition));
}
}
return new Assignment(
assignedPartitions,
data.userData() != null ? data.userData().duplicate() : null);
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing consumer protocol's assignment", e);
}
}
public static Assignment deserializeAssignment(final ByteBuffer buffer) {
return deserializeAssignment(buffer, deserializeVersion(buffer));
}
private static short checkSubscriptionVersion(final short version) {
if (version < ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION)
throw new SchemaException("Unsupported subscription version: " + version);
switch (version) {
case CONSUMER_PROTOCOL_V0:
return deserializeSubscriptionV0(buffer);
case CONSUMER_PROTOCOL_V1:
return deserializeSubscriptionV1(buffer);
// assume all higher versions can be parsed as V1
default:
return deserializeSubscriptionV1(buffer);
}
else if (version > ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION)
return ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION;
else
return version;
}
public static ByteBuffer serializeAssignmentV0(Assignment assignment) {
Struct struct = new Struct(ASSIGNMENT_V0);
struct.set(USER_DATA_KEY_NAME, assignment.userData());
List<Struct> topicAssignments = new ArrayList<>();
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer);
ASSIGNMENT_V0.write(buffer, struct);
buffer.flip();
return buffer;
}
public static ByteBuffer serializeAssignmentV1(Assignment assignment) {
Struct struct = new Struct(ASSIGNMENT_V1);
struct.set(USER_DATA_KEY_NAME, assignment.userData());
List<Struct> topicAssignments = new ArrayList<>();
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
ASSIGNMENT_V1.write(buffer, struct);
buffer.flip();
return buffer;
}
public static ByteBuffer serializeAssignment(Assignment assignment) {
return serializeAssignment(assignment, CONSUMER_PROTOCOL_LATEST_VERSION);
}
public static ByteBuffer serializeAssignment(Assignment assignment, short version) {
switch (version) {
case CONSUMER_PROTOCOL_V0:
return serializeAssignmentV0(assignment);
case CONSUMER_PROTOCOL_V1:
return serializeAssignmentV1(assignment);
default:
// for any versions higher than known, try to serialize it as V1
return serializeAssignmentV1(assignment);
}
}
public static Assignment deserializeAssignmentV0(ByteBuffer buffer) {
Struct struct = ASSIGNMENT_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
Struct assignment = (Struct) structObj;
String topic = assignment.getString(TOPIC_KEY_NAME);
for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
}
return new Assignment(partitions, userData);
}
public static Assignment deserializeAssignmentV1(ByteBuffer buffer) {
return deserializeAssignmentV0(buffer);
}
public static Assignment deserializeAssignment(ByteBuffer buffer) {
Short version = deserializeVersion(buffer);
if (version < CONSUMER_PROTOCOL_V0)
private static short checkAssignmentVersion(final short version) {
if (version < ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION)
throw new SchemaException("Unsupported assignment version: " + version);
else if (version > ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION)
return ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION;
else
return version;
}
switch (version) {
case CONSUMER_PROTOCOL_V0:
return deserializeAssignmentV0(buffer);
case CONSUMER_PROTOCOL_V1:
return deserializeAssignmentV1(buffer);
default:
// assume all higher versions can be parsed as V1
return deserializeAssignmentV1(buffer);
}
private static ByteBuffer serializeMessage(final short version, final Message message) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int size = message.size(cache, version);
ByteBuffer bytes = ByteBuffer.allocate(2 + size);
ByteBufferAccessor accessor = new ByteBufferAccessor(bytes);
accessor.writeShort(version);
message.write(accessor, cache, version);
bytes.flip();
return bytes;
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"type": "data",
"name": "ConsumerProtocolAssignment",
// Assignment part of the Consumer Protocol.
//
// The current implementation assumes that future versions will not break compatibility. When
// it encounters a newer version, it parses it using the current format. This basically means
// that new versions cannot remove or reorder any of the existing fields.
"validVersions": "0-1",
"fields": [
{ "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+" },
{ "name": "Partitions", "type": "[]int32", "versions": "0+" }
]
},
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
"default": "null", "zeroCopy": true }
]
}

View File

@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"type": "data",
"name": "ConsumerProtocolSubscription",
// Subscription part of the Consumer Protocol.
//
// The current implementation assumes that future versions will not break compatibility. When
// it encounters a newer version, it parses it using the current format. This basically means
// that new versions cannot remove or reorder any of the existing fields.
"validVersions": "0-1",
"fields": [
{ "name": "Topics", "type": "[]string", "versions": "0+" },
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
"default": "null", "zeroCopy": true },
{ "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+",
"fields": [
{ "name": "Topic", "type": "string", "versions": "1+" },
{ "name": "Partitions", "type": "[]int32", "versions": "1+"}
]
}
]
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
@ -32,13 +34,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.OWNED_PARTITIONS_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPICS_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_ASSIGNMENT_V0;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -85,7 +80,7 @@ public class ConsumerProtocolTest {
@Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscriptionV0(subscription);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription, (short) 0);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
@ -97,9 +92,8 @@ public class ConsumerProtocolTest {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
ConsumerProtocol.deserializeVersion(buffer);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, (short) 0);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@ -112,21 +106,23 @@ public class ConsumerProtocolTest {
short version = 100;
Schema subscriptionSchemaV100 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
new Field(USER_DATA_KEY_NAME, Type.BYTES),
new Field(OWNED_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field("foo", Type.STRING));
new Field("topics", new ArrayOf(Type.STRING)),
new Field("user_data", Type.NULLABLE_BYTES),
new Field("owned_partitions", new ArrayOf(
ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
new Field("foo", Type.STRING));
Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
subscriptionV100.set(TOPICS_KEY_NAME, new Object[]{"topic"});
subscriptionV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
subscriptionV100.set(OWNED_PARTITIONS_KEY_NAME, new Object[]{new Struct(TOPIC_ASSIGNMENT_V0)
.set(ConsumerProtocol.TOPIC_KEY_NAME, tp2.topic())
.set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp2.partition()})});
subscriptionV100.set("topics", new Object[]{"topic"});
subscriptionV100.set("user_data", ByteBuffer.wrap(new byte[0]));
subscriptionV100.set("owned_partitions", new Object[]{new Struct(
ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)
.set("topic", tp2.topic())
.set("partitions", new Object[]{tp2.partition()})});
subscriptionV100.set("foo", "bar");
Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
headerV100.set(VERSION_KEY_NAME, version);
Struct headerV100 = new Struct(new Schema(new Field("version", Type.INT16)));
headerV100.set("version", version);
ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf());
headerV100.writeTo(buffer);
@ -165,20 +161,21 @@ public class ConsumerProtocolTest {
short version = 100;
Schema assignmentSchemaV100 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.BYTES),
new Field("foo", Type.STRING));
new Field("assigned_partitions", new ArrayOf(
ConsumerProtocolAssignment.TopicPartition.SCHEMA_0)),
new Field("user_data", Type.BYTES),
new Field("foo", Type.STRING));
Struct assignmentV100 = new Struct(assignmentSchemaV100);
assignmentV100.set(TOPIC_PARTITIONS_KEY_NAME,
new Object[]{new Struct(TOPIC_ASSIGNMENT_V0)
.set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
.set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})});
assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
assignmentV100.set("assigned_partitions",
new Object[]{new Struct(ConsumerProtocolAssignment.TopicPartition.SCHEMA_0)
.set("topic", tp1.topic())
.set("partitions", new Object[]{tp1.partition()})});
assignmentV100.set("user_data", ByteBuffer.wrap(new byte[0]));
assignmentV100.set("foo", "bar");
Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
headerV100.set(VERSION_KEY_NAME, version);
Struct headerV100 = new Struct(new Schema(new Field("version", Type.INT16)));
headerV100.set("version", version);
ByteBuffer buffer = ByteBuffer.allocate(assignmentV100.sizeOf() + headerV100.sizeOf());
headerV100.writeTo(buffer);

View File

@ -476,7 +476,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
// future versions of the consumer protocol. VO must prefix all new versions.
val buffer = ByteBuffer.wrap(member.metadata(protocolName.get))
ConsumerProtocol.deserializeVersion(buffer)
ConsumerProtocol.deserializeSubscriptionV0(buffer).topics.asScala.toSet
ConsumerProtocol.deserializeSubscription(buffer, 0).topics.asScala.toSet
}.reduceLeft(_ ++ _)
)
} catch {