HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)

KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them.

Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Jason Gustafson 2019-05-21 22:51:56 -07:00
parent d05b47112a
commit 42aaccec8b
8 changed files with 165 additions and 60 deletions

View File

@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
requestStruct.set(REPLICA_ID, replicaId);
requestStruct.setIfExists(REPLICA_ID, replicaId);
Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition);

View File

@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
{ "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
{ "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." }
]
}

View File

@ -34,7 +34,8 @@
"about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
{ "name": "GroupInstanceId", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "ProtocolType", "type": "string", "versions": "0+",
"about": "The unique name the for class of protocols implemented by the group we want to join." },

View File

@ -35,7 +35,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+",
{ "name": "GroupInstanceId", "type": "string", "versions": "7+",
"nullableVersions": "7+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true,
"about": "The time period in ms to retain the offset." },

View File

@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID assigned by the group." },
{ "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
{ "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
"about": "Each assignment.", "fields": [

View File

@ -18,6 +18,8 @@
package org.apache.kafka.common.message;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
@ -25,21 +27,19 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -49,50 +49,127 @@ public final class MessageTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
/**
* Test serializing and deserializing some messages.
*/
@Test
public void testRoundTrips() throws Exception {
testMessageRoundTrips(new MetadataRequestData().setTopics(
Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
new MetadataRequestData.MetadataRequestTopic().setName("bar")
)), (short) 6);
testMessageRoundTrips(new AddOffsetsToTxnRequestData().
setTransactionalId("foobar").
setProducerId(0xbadcafebadcafeL).
setProducerEpoch((short) 123).
setGroupId("baaz"), (short) 1);
testMessageRoundTrips(new AddOffsetsToTxnResponseData().
setThrottleTimeMs(42).
setErrorCode((short) 0), (short) 0);
testMessageRoundTrips(new AddPartitionsToTxnRequestData().
setTransactionalId("blah").
setProducerId(0xbadcafebadcafeL).
setProducerEpoch((short) 30000).
setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
new AddPartitionsToTxnTopic().
setName("Topic").
setPartitions(Collections.singletonList(1))).iterator())));
testMessageRoundTrips(new CreateTopicsRequestData().
setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
testMessageRoundTrips(new DescribeAclsRequestData().
setResourceType((byte) 42).
setResourceNameFilter(null).
setResourcePatternType((byte) 3).
setPrincipalFilter("abc").
setHostFilter(null).
setOperation((byte) 0).
setPermissionType((byte) 0), (short) 0);
testMessageRoundTrips(new MetadataRequestData().
setTopics(null).
setAllowAutoTopicCreation(false).
setIncludeClusterAuthorizedOperations(false).
setIncludeTopicAuthorizedOperations(false));
public void testAddOffsetsToTxnVersions() throws Exception {
testAllMessageRoundTrips(new AddOffsetsToTxnRequestData().
setTransactionalId("foobar").
setProducerId(0xbadcafebadcafeL).
setProducerEpoch((short) 123).
setGroupId("baaz"));
testAllMessageRoundTrips(new AddOffsetsToTxnResponseData().
setThrottleTimeMs(42).
setErrorCode((short) 0));
}
private void testMessageRoundTrips(Message message) throws Exception {
testMessageRoundTrips(message, message.highestSupportedVersion());
@Test
public void testAddPartitionsToTxnVersions() throws Exception {
testAllMessageRoundTrips(new AddPartitionsToTxnRequestData().
setTransactionalId("blah").
setProducerId(0xbadcafebadcafeL).
setProducerEpoch((short) 30000).
setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
new AddPartitionsToTxnTopic().
setName("Topic").
setPartitions(Collections.singletonList(1))).iterator())));
}
@Test
public void testCreateTopicsVersions() throws Exception {
testAllMessageRoundTrips(new CreateTopicsRequestData().
setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
}
@Test
public void testDescribeAclsRequest() throws Exception {
testAllMessageRoundTrips(new DescribeAclsRequestData().
setResourceType((byte) 42).
setResourceNameFilter(null).
setResourcePatternType((byte) 3).
setPrincipalFilter("abc").
setHostFilter(null).
setOperation((byte) 0).
setPermissionType((byte) 0));
}
@Test
public void testMetadataVersions() throws Exception {
testAllMessageRoundTrips(new MetadataRequestData().setTopics(
Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
new MetadataRequestData.MetadataRequestTopic().setName("bar")
)));
testAllMessageRoundTripsFromVersion(new MetadataRequestData().
setTopics(null).
setAllowAutoTopicCreation(true).
setIncludeClusterAuthorizedOperations(false).
setIncludeTopicAuthorizedOperations(false), (short) 1);
testAllMessageRoundTripsFromVersion(new MetadataRequestData().
setTopics(null).
setAllowAutoTopicCreation(false).
setIncludeClusterAuthorizedOperations(false).
setIncludeTopicAuthorizedOperations(false), (short) 4);
}
@Test
public void testHeartbeatVersions() throws Exception {
Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
.setGenerationId(15);
testAllMessageRoundTrips(newRequest.get());
testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3);
}
@Test
public void testJoinGroupVersions() throws Exception {
Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
.setProtocolType("consumer")
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
.setSessionTimeoutMs(10000);
testAllMessageRoundTrips(newRequest.get());
testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1);
testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5);
}
@Test
public void testSyncGroupDefaultGroupInstanceId() throws Exception {
Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
.setGenerationId(15)
.setAssignments(new ArrayList<>());
testAllMessageRoundTrips(request.get());
testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3);
}
@Test
public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
testAllMessageRoundTrips(new OffsetCommitRequestData()
.setTopics(new ArrayList<>())
.setGroupId("groupId"));
Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
.setTopics(new ArrayList<>())
.setGenerationId(15);
testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1);
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7);
}
private void testAllMessageRoundTrips(Message message) throws Exception {
testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion());
}
private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception {
for (short version = fromVersion; version < message.highestSupportedVersion(); version++) {
testMessageRoundTrips(message, version);
}
}
private void testMessageRoundTrips(Message message, short version) throws Exception {
@ -294,6 +371,25 @@ public final class MessageTest {
new FetchRequestData.ForgottenTopic().setName("foo"))));
}
@Test
public void testNonIgnorableFieldWithDefaultNull() throws Exception {
// Test non-ignorable string field `groupInstanceId` with default null
verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
.setMemberId("memberId")
.setGroupInstanceId("instanceId"));
verifySizeSucceeds((short) 0, new HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
.setMemberId("memberId")
.setGroupInstanceId(null));
verifySizeSucceeds((short) 0, new HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
.setMemberId("memberId"));
}
private void verifySizeRaisesUve(short version, String problemFieldName,
Message message) throws Exception {
try {

View File

@ -290,9 +290,10 @@ public class RequestResponseTest {
checkRequest(createListOffsetRequest(0), true);
checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
checkResponse(createListOffsetResponse(0), 0, true);
checkRequest(createLeaderEpochRequest(), true);
checkRequest(createLeaderEpochRequest(0), true);
checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true);
checkResponse(createLeaderEpochResponse(), 0, true);
checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException(), true);
checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true);
checkRequest(createAddPartitionsToTxnRequest(), true);
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@ -1250,7 +1251,7 @@ public class RequestResponseTest {
return new InitProducerIdResponse(responseData);
}
private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
epochs.put(new TopicPartition("topic1", 0),
@ -1260,7 +1261,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic2", 2),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build();
}
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {

View File

@ -928,7 +928,11 @@ public final class MessageDataGenerator {
} else if (field.type().isBytes()) {
buffer.printf("if (%s.length != 0) {%n", field.camelCaseName());
} else if (field.type().isString()) {
buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
if (fieldDefault(field).equals("null")) {
buffer.printf("if (%s != null) {%n", field.camelCaseName());
} else {
buffer.printf("if (!%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
}
} else if (field.type() instanceof FieldType.BoolFieldType) {
buffer.printf("if (%s%s) {%n",
fieldDefault(field).equals("true") ? "!" : "",