KAFKA-10647; Only serialize owned partitions when consumer protocol version >= 1 (#9506)

A regression got introduced by 466f8fd21c. The owned partition field must be ignored for version < 1 otherwise the serialization fails with an unsupported version exception.

Reviewers: Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2020-10-27 11:11:24 +01:00 committed by GitHub
parent c2737ee9eb
commit aa287acb2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 1 deletions

View File

@ -26,7 +26,7 @@
{ "name": "Topics", "type": "[]string", "versions": "0+" }, { "name": "Topics", "type": "[]string", "versions": "0+" },
{ "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
"default": "null", "zeroCopy": true }, "default": "null", "zeroCopy": true },
{ "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
"fields": [ "fields": [
{ "name": "Topic", "type": "string", "versions": "1+" }, { "name": "Topic", "type": "string", "versions": "1+" },
{ "name": "Partitions", "type": "[]int32", "versions": "1+"} { "name": "Partitions", "type": "[]int32", "versions": "1+"}

View File

@ -46,6 +46,30 @@ public class ConsumerProtocolTest {
private final TopicPartition tp2 = new TopicPartition("bar", 2); private final TopicPartition tp2 = new TopicPartition("bar", 2);
private final Optional<String> groupInstanceId = Optional.of("instance.id"); private final Optional<String> groupInstanceId = Optional.of("instance.id");
@Test
public void serializeDeserializeSubscriptionAllVersions() {
List<TopicPartition> ownedPartitions = Arrays.asList(
new TopicPartition("foo", 0),
new TopicPartition("bar", 0));
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"),
ByteBuffer.wrap("hello".getBytes()), ownedPartitions);
for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription, version);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(subscription.userData(), parsedSubscription.userData());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
if (version >= 1) {
assertEquals(toSet(subscription.ownedPartitions()), toSet(parsedSubscription.ownedPartitions()));
} else {
assertEquals(Collections.emptyList(), parsedSubscription.ownedPartitions());
}
}
}
@Test @Test
public void serializeDeserializeMetadata() { public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0])); Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
@ -137,6 +161,19 @@ public class ConsumerProtocolTest {
assertEquals(groupInstanceId, subscription.groupInstanceId()); assertEquals(groupInstanceId, subscription.groupInstanceId());
} }
@Test
public void serializeDeserializeAssignmentAllVersions() {
List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
Assignment assignment = new Assignment(partitions, ByteBuffer.wrap("hello".getBytes()));
for (short version = ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignment, version);
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertEquals(assignment.userData(), parsedAssignment.userData());
}
}
@Test @Test
public void serializeDeserializeAssignment() { public void serializeDeserializeAssignment() {
List<TopicPartition> partitions = Arrays.asList(tp1, tp2); List<TopicPartition> partitions = Arrays.asList(tp1, tp2);