KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (#7994)

This PR implements the KIP-559: https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies
- it adds the Protocol Type and the Protocol Name fields in JoinGroup and SyncGroup API;
- it validates that the fields are provided by the client when the new version of the API is used and ensure that they are consistent. it errors out otherwise;
- it validates that the fields are consistent in the client and errors out otherwise;
- it adds many tests related to the API changes but also extends the testing coverage of the requests/responses themselves.
- it standardises the naming in the coordinator. now, `ProtocolType` and `ProtocolName` are used across the board in the coordinator instead of having a mix of protocol type, protocol name, subprotocol, protocol, etc.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2020-01-31 16:54:07 -05:00 committed by GitHub
parent c8d97c6d51
commit 96c4ce4803
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1098 additions and 399 deletions

View File

@ -436,7 +436,7 @@ public abstract class AbstractCoordinator implements Closeable {
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);
// Generally speaking we should always resetJoinGroupFuture once the future is done, but here
// we can only reset the join group future after the completion callback returns. This ensures
@ -575,21 +575,28 @@ public abstract class AbstractCoordinator implements Closeable {
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
log.debug("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
onJoinFollower().chain(future);
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
}
@ -654,6 +661,8 @@ public abstract class AbstractCoordinator implements Closeable {
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
@ -681,6 +690,8 @@ public abstract class AbstractCoordinator implements Closeable {
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
@ -705,8 +716,18 @@ public abstract class AbstractCoordinator implements Closeable {
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncSensor.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
log.debug("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
syncResponse.data.protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else if (isProtocolNameInconsistent(syncResponse.data.protocolName())) {
log.debug("SyncGroup failed due to inconsistent Protocol Name, received {} but expected {}",
syncResponse.data.protocolName(), generation().protocolName);
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
sensors.syncSensor.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
requestRejoin();
@ -887,6 +908,14 @@ public abstract class AbstractCoordinator implements Closeable {
this.rejoinNeeded = true;
}
private boolean isProtocolTypeInconsistent(String protocolType) {
return protocolType != null && !protocolType.equals(protocolType());
}
private boolean isProtocolNameInconsistent(String protocolName) {
return protocolName != null && !protocolName.equals(generation().protocolName);
}
/**
* Close the coordinator, waiting if needed to send LeaveGroup.
*/
@ -1320,12 +1349,12 @@ public abstract class AbstractCoordinator implements Closeable {
public final int generationId;
public final String memberId;
public final String protocol;
public final String protocolName;
public Generation(int generationId, String memberId, String protocol) {
public Generation(int generationId, String memberId, String protocolName) {
this.generationId = generationId;
this.memberId = memberId;
this.protocol = protocol;
this.protocolName = protocolName;
}
/**
@ -1343,12 +1372,12 @@ public abstract class AbstractCoordinator implements Closeable {
final Generation that = (Generation) o;
return generationId == that.generationId &&
Objects.equals(memberId, that.memberId) &&
Objects.equals(protocol, that.protocol);
Objects.equals(protocolName, that.protocolName);
}
@Override
public int hashCode() {
return Objects.hash(generationId, memberId, protocol);
return Objects.hash(generationId, memberId, protocolName);
}
@Override
@ -1356,7 +1385,7 @@ public abstract class AbstractCoordinator implements Closeable {
return "Generation{" +
"generationId=" + generationId +
", memberId='" + memberId + '\'' +
", protocol='" + protocol + '\'' +
", protocol='" + protocolName + '\'' +
'}';
}
}

View File

@ -57,7 +57,7 @@ public class JoinGroupRequest extends AbstractRequest {
public static final String UNKNOWN_MEMBER_ID = "";
public static final int UNKNOWN_GENERATION_ID = -1;
public static final String UNKNOWN_PROTOCOL = "";
public static final String UNKNOWN_PROTOCOL_NAME = "";
private static final int MAX_GROUP_INSTANCE_ID_LENGTH = 249;
@ -119,14 +119,21 @@ public class JoinGroupRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new JoinGroupResponse(new JoinGroupResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
.setGenerationId(UNKNOWN_GENERATION_ID)
.setProtocolName(UNKNOWN_PROTOCOL)
.setLeader(UNKNOWN_MEMBER_ID)
.setMemberId(UNKNOWN_MEMBER_ID)
.setMembers(Collections.emptyList()));
JoinGroupResponseData data = new JoinGroupResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
.setGenerationId(UNKNOWN_GENERATION_ID)
.setProtocolName(UNKNOWN_PROTOCOL_NAME)
.setLeader(UNKNOWN_MEMBER_ID)
.setMemberId(UNKNOWN_MEMBER_ID)
.setMembers(Collections.emptyList());
if (version() >= 7)
data.setProtocolName(null);
else
data.setProtocolName(UNKNOWN_PROTOCOL_NAME);
return new JoinGroupResponse(data);
}
public static JoinGroupRequest parse(ByteBuffer buffer, short version) {

View File

@ -81,6 +81,17 @@ public class SyncGroupRequest extends AbstractRequest {
return groupAssignments;
}
/**
* ProtocolType and ProtocolName are mandatory since version 5. This methods verifies that
* they are defined for version 5 or higher, or returns true otherwise for older versions.
*/
public boolean areMandatoryProtocolTypeAndNamePresent() {
if (version() >= 5)
return data.protocolType() != null && data.protocolName() != null;
else
return true;
}
public static SyncGroupRequest parse(ByteBuffer buffer, short version) {
return new SyncGroupRequest(ApiKeys.SYNC_GROUP.parseRequest(version, buffer), version);
}

View File

@ -62,7 +62,7 @@ public class SyncGroupResponse extends AbstractResponse {
}
public static SyncGroupResponse parse(ByteBuffer buffer, short version) {
return new SyncGroupResponse(ApiKeys.SYNC_GROUP.parseResponse(version, buffer));
return new SyncGroupResponse(ApiKeys.SYNC_GROUP.parseResponse(version, buffer), version);
}
@Override

View File

@ -27,7 +27,9 @@
// with assigned id.
//
// Version 6 is the first flexible version.
"validVersions": "0-6",
//
// Version 7 is the same as version 6.
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

View File

@ -29,7 +29,9 @@
// Version 5 is bumped to apply group.instance.id to identify member across restarts.
//
// Version 6 is the first flexible version.
"validVersions": "0-6",
//
// Starting from version 7, the broker sends back the Protocol Type to the client (KIP-599).
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
@ -38,7 +40,10 @@
"about": "The error code, or 0 if there was no error." },
{ "name": "GenerationId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The generation ID of the group." },
{ "name": "ProtocolName", "type": "string", "versions": "0+",
{ "name": "ProtocolType", "type": "string", "versions": "7+",
"nullableVersions": "7+", "default": "null", "ignorable": true,
"about": "The group protocol name." },
{ "name": "ProtocolName", "type": "string", "versions": "0+", "nullableVersions": "7+",
"about": "The group protocol selected by the coordinator." },
{ "name": "Leader", "type": "string", "versions": "0+",
"about": "The leader of the group." },

View File

@ -22,7 +22,11 @@
// Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 4 is the first flexible version.
"validVersions": "0-4",
//
// Starting from version 5, the client sends the Protocol Type and the Protocol Name
// to the broker (KIP-599). The broker will reject the request if they are inconsistent
// with the Type and Name known by the broker.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
@ -34,6 +38,12 @@
{ "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "ProtocolType", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null", "ignorable": true,
"about": "The group protocol type." },
{ "name": "ProtocolName", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null", "ignorable": true,
"about": "The group protocol name." },
{ "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
"about": "Each assignment.", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",

View File

@ -24,13 +24,22 @@
// Starting from version 3, syncGroupRequest supports a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 4 is the first flexible version.
"validVersions": "0-4",
//
// Starting from version 5, the broker sends back the Protocol Type and the Protocol Name
// to the client (KIP-599).
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ProtocolType", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null", "ignorable": true,
"about": "The group protocol type." },
{ "name": "ProtocolName", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null", "ignorable": true,
"about": "The group protocol name." },
{ "name": "Assignment", "type": "bytes", "versions": "0+",
"about": "The member assignment." }
]

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@ -85,6 +86,8 @@ public class AbstractCoordinatorTest {
private static final int REQUEST_TIMEOUT_MS = 40000;
private static final String GROUP_ID = "dummy-group";
private static final String METRIC_GROUP_PREFIX = "consumer";
private static final String PROTOCOL_TYPE = "dummy";
private static final String PROTOCOL_NAME = "dummy-subprotocol";
private Node node;
private Metrics metrics;
@ -360,6 +363,63 @@ public class AbstractCoordinatorTest {
assertFalse(future.isRetriable());
}
@Test
public void testJoinGroupProtocolTypeAndName() {
String wrongProtocolType = "wrong-type";
String wrongProtocolName = "wrong-name";
// No Protocol Type in both JoinGroup and SyncGroup responses
assertTrue(joinGroupWithProtocolTypeAndName(null, null, null));
// Protocol Type in both JoinGroup and SyncGroup responses
assertTrue(joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, PROTOCOL_NAME));
// Wrong protocol type in the JoinGroupResponse
assertThrows(InconsistentGroupProtocolException.class,
() -> joinGroupWithProtocolTypeAndName("wrong", null, null));
// Correct protocol type in the JoinGroupResponse
// Wrong protocol type in the SyncGroupResponse
// Correct protocol name in the SyncGroupResponse
assertThrows(InconsistentGroupProtocolException.class,
() -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, wrongProtocolType, PROTOCOL_NAME));
// Correct protocol type in the JoinGroupResponse
// Correct protocol type in the SyncGroupResponse
// Wrong protocol name in the SyncGroupResponse
assertThrows(InconsistentGroupProtocolException.class,
() -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName));
}
private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType,
String syncGroupResponseProtocolType,
String syncGroupResponseProtocolName) {
setupCoordinator();
mockClient.reset();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
mockClient.prepareResponse(body -> {
if (!(body instanceof JoinGroupRequest)) {
return false;
}
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
}, joinGroupFollowerResponse(defaultGeneration, memberId,
"memberid", Errors.NONE, joinGroupResponseProtocolType));
mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName));
return coordinator.joinGroupIfNeeded(mockTime.timer(5000L));
}
@Test
public void testSyncGroupRequestWithFencedInstanceIdException() {
setupCoordinator();
@ -946,12 +1006,24 @@ public class AbstractCoordinatorTest {
return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
private JoinGroupResponse joinGroupFollowerResponse(int generationId,
String memberId,
String leaderId,
Errors error) {
return joinGroupFollowerResponse(generationId, memberId, leaderId, error, null);
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId,
String memberId,
String leaderId,
Errors error,
String protocolType) {
return new JoinGroupResponse(
new JoinGroupResponseData()
.setErrorCode(error.code())
.setGenerationId(generationId)
.setProtocolName("dummy-subprotocol")
.setProtocolType(protocolType)
.setProtocolName(PROTOCOL_NAME)
.setMemberId(memberId)
.setLeader(leaderId)
.setMembers(Collections.emptyList())
@ -964,9 +1036,17 @@ public class AbstractCoordinatorTest {
}
private SyncGroupResponse syncGroupResponse(Errors error) {
return syncGroupResponse(error, null, null);
}
private SyncGroupResponse syncGroupResponse(Errors error,
String protocolType,
String protocolName) {
return new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(error.code())
.setProtocolType(protocolType)
.setProtocolName(protocolName)
.setAssignment(new byte[0])
);
}
@ -992,14 +1072,14 @@ public class AbstractCoordinatorTest {
@Override
protected String protocolType() {
return "dummy";
return PROTOCOL_TYPE;
}
@Override
protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("dummy-subprotocol")
.setName(PROTOCOL_NAME)
.setMetadata(EMPTY_DATA.array())).iterator()
);
}

View File

@ -89,6 +89,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
@ -115,6 +116,9 @@ import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.StopReplicaResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
@ -202,10 +206,19 @@ public class RequestResponseTest {
checkRequest(createHeartBeatRequest(), true);
checkErrorResponse(createHeartBeatRequest(), new UnknownServerException(), true);
checkResponse(createHeartBeatResponse(), 0, true);
checkRequest(createJoinGroupRequest(1), true);
checkErrorResponse(createJoinGroupRequest(0), new UnknownServerException(), true);
checkErrorResponse(createJoinGroupRequest(1), new UnknownServerException(), true);
checkResponse(createJoinGroupResponse(), 0, true);
for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) {
checkRequest(createJoinGroupRequest(v), true);
checkErrorResponse(createJoinGroupRequest(v), new UnknownServerException(), true);
checkResponse(createJoinGroupResponse(v), v, true);
}
for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) {
checkRequest(createSyncGroupRequest(v), true);
checkErrorResponse(createSyncGroupRequest(v), new UnknownServerException(), true);
checkResponse(createSyncGroupResponse(v), v, true);
}
checkRequest(createLeaveGroupRequest(), true);
checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true);
checkResponse(createLeaveGroupResponse(), 0, true);
@ -1003,56 +1016,104 @@ public class RequestResponseTest {
}
private JoinGroupRequest createJoinGroupRequest(int version) {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(
new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("consumer-range")
.setMetadata(new byte[0])).iterator()
);
if (version <= 4) {
return new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId("group1")
.setSessionTimeoutMs(30000)
.setMemberId("consumer1")
.setGroupInstanceId(null)
.setProtocolType("consumer")
.setProtocols(protocols)
.setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
.build((short) version);
} else {
return new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId("group1")
.setSessionTimeoutMs(30000)
.setMemberId("consumer1")
.setGroupInstanceId("groupInstanceId") // v5 and above could set group instance id
.setProtocolType("consumer")
.setProtocols(protocols)
.setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
.build((short) version);
}
JoinGroupRequestData data = new JoinGroupRequestData()
.setGroupId("group1")
.setSessionTimeoutMs(30000)
.setMemberId("consumer1")
.setProtocolType("consumer")
.setProtocols(protocols);
// v1 and above contains rebalance timeout
if (version >= 1)
data.setRebalanceTimeoutMs(60000);
// v5 and above could set group instance id
if (version >= 5)
data.setGroupInstanceId("groupInstanceId");
return new JoinGroupRequest.Builder(data).build((short) version);
}
private JoinGroupResponse createJoinGroupResponse() {
List<JoinGroupResponseData.JoinGroupResponseMember> members = Arrays.asList(
new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("consumer1")
.setMetadata(new byte[0]),
new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("consumer2")
.setMetadata(new byte[0])
private JoinGroupResponse createJoinGroupResponse(int version) {
List<JoinGroupResponseData.JoinGroupResponseMember> members = new ArrayList<>();
for (int i = 0; i < 2; i++) {
JoinGroupResponseMember member = new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("consumer" + i)
.setMetadata(new byte[0]);
if (version >= 5)
member.setGroupInstanceId("instance" + i);
members.add(member);
}
JoinGroupResponseData data = new JoinGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setGenerationId(1)
.setProtocolType("consumer") // Added in v7 but ignorable
.setProtocolName("range")
.setLeader("leader")
.setMemberId("consumer1")
.setMembers(members);
// v1 and above could set throttle time
if (version >= 1)
data.setThrottleTimeMs(1000);
return new JoinGroupResponse(data);
}
private SyncGroupRequest createSyncGroupRequest(int version) {
List<SyncGroupRequestAssignment> assignments = Collections.singletonList(
new SyncGroupRequestAssignment()
.setMemberId("member")
.setAssignment(new byte[0])
);
return new JoinGroupResponse(
new JoinGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setGenerationId(1)
.setProtocolName("range")
.setLeader("leader")
.setMemberId("consumer1")
.setMembers(members)
);
SyncGroupRequestData data = new SyncGroupRequestData()
.setGroupId("group1")
.setGenerationId(1)
.setMemberId("member")
.setProtocolType("consumer") // Added in v5 but ignorable
.setProtocolName("range") // Added in v5 but ignorable
.setAssignments(assignments);
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(
new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("consumer-range")
.setMetadata(new byte[0])).iterator()
);
// v3 and above could set group instance id
if (version >= 3)
data.setGroupInstanceId("groupInstanceId");
return new SyncGroupRequest.Builder(data).build((short) version);
}
private SyncGroupResponse createSyncGroupResponse(int version) {
SyncGroupResponseData data = new SyncGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setProtocolType("consumer") // Added in v5 but ignorable
.setProtocolName("range") // Added in v5 but ignorable
.setAssignment(new byte[0]);
// v1 and above could set throttle time
if (version >= 1)
data.setThrottleTimeMs(1000);
return new SyncGroupResponse(data);
}
private ListGroupsRequest createListGroupsRequest() {

View File

@ -136,13 +136,13 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
responseCallback(joinError(memberId, error))
responseCallback(JoinGroupResult(memberId, error))
return
}
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
responseCallback(JoinGroupResult(memberId, Errors.INVALID_SESSION_TIMEOUT))
} else {
val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
groupManager.getGroup(groupId) match {
@ -154,7 +154,7 @@ class GroupCoordinator(val brokerId: Int,
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
case Some(group) =>
group.inLock {
@ -163,7 +163,7 @@ class GroupCoordinator(val brokerId: Int,
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
group.remove(memberId)
group.removeStaticMember(groupInstanceId)
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
@ -195,9 +195,9 @@ class GroupCoordinator(val brokerId: Int,
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE))
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
@ -222,7 +222,8 @@ class GroupCoordinator(val brokerId: Int,
members = List.empty,
memberId = newMemberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
protocolType = group.protocolType,
protocolName = group.protocolName,
// We want to avoid current leader performing trivial assignment while the group
// is in stable/awaiting sync stage, because the new assignment in leader's next sync call
// won't be broadcast by a stable/awaiting sync group. This could be guaranteed by
@ -243,7 +244,7 @@ class GroupCoordinator(val brokerId: Int,
s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
@ -253,7 +254,7 @@ class GroupCoordinator(val brokerId: Int,
}
}
}
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
@ -270,9 +271,9 @@ class GroupCoordinator(val brokerId: Int,
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted. Note that pending member will never be a static member.
if (groupInstanceId.isDefined) {
@ -286,12 +287,12 @@ class GroupCoordinator(val brokerId: Int,
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
responseCallback(joinError(memberId, Errors.FENCED_INSTANCE_ID))
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId) || groupInstanceIdNotFound) {
// If the dynamic member trying to register with an unrecognized id, or
// the static member joins with unknown group instance id, send the response to let
// it reset its member id and retry.
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
val member = group.get(memberId)
@ -312,7 +313,8 @@ class GroupCoordinator(val brokerId: Int,
},
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
@ -334,7 +336,8 @@ class GroupCoordinator(val brokerId: Int,
members = List.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
@ -343,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
s"unexpected group state ${group.currentState}")
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
}
}
@ -353,6 +356,8 @@ class GroupCoordinator(val brokerId: Int,
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
@ -362,14 +367,15 @@ class GroupCoordinator(val brokerId: Int,
// group will need to start over at JoinGroup. By returning rebalance in progress, the consumer
// will attempt to rejoin without needing to rediscover the coordinator. Note that we cannot
// return COORDINATOR_LOAD_IN_PROGRESS since older clients do not expect the error.
responseCallback(SyncGroupResult(Array.empty, Errors.REBALANCE_IN_PROGRESS))
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case Some(error) => responseCallback(SyncGroupResult(Array.empty, error))
case Some(error) => responseCallback(SyncGroupResult(error))
case None =>
groupManager.getGroup(groupId) match {
case None => responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID))
case Some(group) => doSyncGroup(group, generation, memberId, groupInstanceId, groupAssignment, responseCallback)
case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,
groupInstanceId, groupAssignment, responseCallback)
}
}
}
@ -377,6 +383,8 @@ class GroupCoordinator(val brokerId: Int,
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
@ -386,20 +394,24 @@ class GroupCoordinator(val brokerId: Int,
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(SyncGroupResult(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE))
responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
} else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
responseCallback(SyncGroupResult(Array.empty, Errors.FENCED_INSTANCE_ID))
responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId)) {
responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID))
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
responseCallback(SyncGroupResult(Array.empty, Errors.ILLEGAL_GENERATION))
responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
} else if (protocolType.isDefined && !group.protocolType.contains(protocolType.get)) {
responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (protocolName.isDefined && !group.protocolName.contains(protocolName.get)) {
responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
group.currentState match {
case Empty =>
responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID))
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Array.empty, Errors.REBALANCE_IN_PROGRESS))
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback
@ -434,7 +446,7 @@ class GroupCoordinator(val brokerId: Int,
case Stable =>
// if the group is stable, we just return the current assignment
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(memberMetadata.assignment, Errors.NONE))
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
case Dead =>
@ -860,14 +872,14 @@ class GroupCoordinator(val brokerId: Int,
case Empty | Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
group.maybeInvokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR))
group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NOT_COORDINATOR))
}
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
case Stable | CompletingRebalance =>
for (member <- group.allMemberMetadata) {
group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NOT_COORDINATOR))
group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NOT_COORDINATOR))
heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
}
}
@ -917,8 +929,12 @@ class GroupCoordinator(val brokerId: Int,
}
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
val (protocolType, protocolName) = if (error == Errors.NONE)
(group.protocolType, group.protocolName)
else
(None, None)
for (member <- group.allMemberMetadata) {
if (group.maybeInvokeSyncCallback(member, SyncGroupResult(member.assignment, error))) {
if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
// reset the session timeout for members after propagating the member's assignment.
// This is because if any member's session expired while we were still awaiting either
// the leader sync group or the storage callback, its expiration will be ignored and no
@ -1043,7 +1059,7 @@ class GroupCoordinator(val brokerId: Int,
// New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
// to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
// will retry the JoinGroup request if is still active.
group.maybeInvokeJoinCallback(member, joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
group.maybeInvokeJoinCallback(member, JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
group.remove(member.memberId)
group.removeStaticMember(member.groupInstanceId)
@ -1123,7 +1139,8 @@ class GroupCoordinator(val brokerId: Int,
},
memberId = member.memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
@ -1251,16 +1268,6 @@ object GroupCoordinator {
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
}
def joinError(memberId: String, error: Errors): JoinGroupResult = {
JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = GroupCoordinator.NoGeneration,
subProtocol = GroupCoordinator.NoProtocol,
leaderId = GroupCoordinator.NoLeader,
error = error)
}
private def memberLeaveError(memberIdentity: MemberIdentity,
error: Errors): LeaveMemberResponse = {
LeaveMemberResponse(
@ -1285,13 +1292,35 @@ case class GroupConfig(groupMinSessionTimeoutMs: Int,
case class JoinGroupResult(members: List[JoinGroupResponseMember],
memberId: String,
generationId: Int,
subProtocol: String,
protocolType: Option[String],
protocolName: Option[String],
leaderId: String,
error: Errors)
case class SyncGroupResult(memberAssignment: Array[Byte],
object JoinGroupResult {
def apply(memberId: String, error: Errors): JoinGroupResult = {
JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = GroupCoordinator.NoGeneration,
protocolType = None,
protocolName = None,
leaderId = GroupCoordinator.NoLeader,
error = error)
}
}
case class SyncGroupResult(protocolType: Option[String],
protocolName: Option[String],
memberAssignment: Array[Byte],
error: Errors)
object SyncGroupResult {
def apply(error: Errors): SyncGroupResult = {
SyncGroupResult(None, None, Array.empty, error)
}
}
case class LeaveMemberResponse(memberId: String,
groupInstanceId: Option[String],
error: Errors)

View File

@ -129,7 +129,7 @@ private object GroupMetadata {
initialState: GroupState,
generationId: Int,
protocolType: String,
protocol: String,
protocolName: String,
leaderId: String,
currentStateTimestamp: Option[Long],
members: Iterable[MemberMetadata],
@ -137,7 +137,7 @@ private object GroupMetadata {
val group = new GroupMetadata(groupId, initialState, time)
group.generationId = generationId
group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
group.protocol = Option(protocol)
group.protocolName = Option(protocolName)
group.leaderId = Option(leaderId)
group.currentStateTimestamp = currentStateTimestamp
members.foreach(member => {
@ -199,9 +199,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private var state: GroupState = initialState
var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None
var protocolName: Option[String] = None
var generationId = 0
private var leaderId: Option[String] = None
private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
// Static membership mapping [key: group.instance.id, value: member.id]
@ -231,7 +231,6 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
def leaderOrNull: String = leaderId.orNull
def protocolOrNull: String = protocol.orNull
def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
def isConsumerGroup: Boolean = protocolType.contains(ConsumerProtocol.PROTOCOL_TYPE)
@ -309,17 +308,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
.getOrElse(throw new IllegalArgumentException(s"Cannot replace non-existing member id $oldMemberId"))
// Fence potential duplicate member immediately if someone awaits join/sync callback.
maybeInvokeJoinCallback(oldMember, JoinGroupResult(
members = List.empty,
memberId = oldMemberId,
generationId = GroupCoordinator.NoGeneration,
subProtocol = GroupCoordinator.NoProtocol,
leaderId = GroupCoordinator.NoLeader,
error = Errors.FENCED_INSTANCE_ID))
maybeInvokeJoinCallback(oldMember, JoinGroupResult(oldMemberId, Errors.FENCED_INSTANCE_ID))
maybeInvokeSyncCallback(oldMember, SyncGroupResult(
Array.empty, Errors.FENCED_INSTANCE_ID
))
maybeInvokeSyncCallback(oldMember, SyncGroupResult(Errors.FENCED_INSTANCE_ID))
oldMember.memberId = newMemberId
members.put(newMemberId, oldMember)
@ -461,21 +452,21 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
*/
private[group] def computeSubscribedTopics(): Option[Set[String]] = {
protocolType match {
case Some(ConsumerProtocol.PROTOCOL_TYPE) if members.nonEmpty && protocol.isDefined =>
case Some(ConsumerProtocol.PROTOCOL_TYPE) if members.nonEmpty && protocolName.isDefined =>
try {
Some(
members.map { case (_, member) =>
// The consumer protocol is parsed with V0 which is the based prefix of all versions.
// This way the consumer group manager does not depend on any specific existing or
// future versions of the consumer protocol. VO must prefix all new versions.
val buffer = ByteBuffer.wrap(member.metadata(protocol.get))
val buffer = ByteBuffer.wrap(member.metadata(protocolName.get))
ConsumerProtocol.deserializeVersion(buffer)
ConsumerProtocol.deserializeSubscriptionV0(buffer).topics.asScala.toSet
}.reduceLeft(_ ++ _)
)
} catch {
case e: SchemaException => {
warn(s"Failed to parse Consumer Protocol ${ConsumerProtocol.PROTOCOL_TYPE}:${protocol.get} " +
warn(s"Failed to parse Consumer Protocol ${ConsumerProtocol.PROTOCOL_TYPE}:${protocolName.get} " +
s"of group $groupId. Consumer group coordinator is not aware of the subscribed topics.", e)
None
}
@ -529,12 +520,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def initNextGeneration() = {
if (members.nonEmpty) {
generationId += 1
protocol = Some(selectProtocol)
protocolName = Some(selectProtocol)
subscribedTopics = computeSubscribedTopics()
transitionTo(CompletingRebalance)
} else {
generationId += 1
protocol = None
protocolName = None
subscribedTopics = computeSubscribedTopics()
transitionTo(Empty)
}
@ -548,13 +539,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
members.map{ case (memberId, memberMetadata) => new JoinGroupResponseMember()
.setMemberId(memberId)
.setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
.setMetadata(memberMetadata.metadata(protocol.get))
.setMetadata(memberMetadata.metadata(protocolName.get))
}.toList
}
def summary: GroupSummary = {
if (is(Stable)) {
val protocol = protocolOrNull
val protocol = protocolName.orNull
if (protocol == null)
throw new IllegalStateException("Invalid null group protocol for stable group")

View File

@ -1244,7 +1244,7 @@ object GroupMetadataManager {
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId)
value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
value.set(LEADER_KEY, groupMetadata.leaderOrNull)
if (version >= 2)
@ -1264,7 +1264,7 @@ object GroupMetadataManager {
memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
// The group is non-empty, so the current protocol must be defined
val protocol = groupMetadata.protocolOrNull
val protocol = groupMetadata.protocolName.orNull
if (protocol == null)
throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol")
@ -1530,7 +1530,7 @@ object GroupMetadataManager {
Json.encodeAsString(Map(
"protocolType" -> protocolType,
"protocol" -> group.protocolOrNull,
"protocol" -> group.protocolName.orNull,
"generationId" -> group.generationId,
"assignment" -> assignment
).asJava)

View File

@ -1390,12 +1390,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val protocolName = if (request.context.apiVersion() >= 7)
joinResult.protocolName.orNull
else
joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
val responseBody = new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(joinResult.error.code)
.setGenerationId(joinResult.generationId)
.setProtocolName(joinResult.subProtocol)
.setProtocolType(joinResult.protocolType.orNull)
.setProtocolName(protocolName)
.setLeader(joinResult.leaderId)
.setMemberId(joinResult.memberId)
.setMembers(joinResult.members.asJava)
@ -1412,28 +1418,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(JoinGroupResult(
List.empty,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
JoinGroupRequest.UNKNOWN_GENERATION_ID,
JoinGroupRequest.UNKNOWN_PROTOCOL,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
Errors.UNSUPPORTED_VERSION
))
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
} else if (!authorize(request, READ, GROUP, joinGroupRequest.data.groupId)) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
.setGenerationId(JoinGroupRequest.UNKNOWN_GENERATION_ID)
.setProtocolName(JoinGroupRequest.UNKNOWN_PROTOCOL)
.setLeader(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setMembers(util.Collections.emptyList())
)
)
} else {
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
@ -1443,6 +1431,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// let the coordinator handle join-group
val protocols = joinGroupRequest.data.protocols.valuesList.asScala.map(protocol =>
(protocol.name, protocol.metadata)).toList
groupCoordinator.handleJoinGroup(
joinGroupRequest.data.groupId,
joinGroupRequest.data.memberId,
@ -1466,6 +1455,8 @@ class KafkaApis(val requestChannel: RequestChannel,
new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(syncGroupResult.error.code)
.setProtocolType(syncGroupResult.protocolType.orNull)
.setProtocolName(syncGroupResult.protocolName.orNull)
.setAssignment(syncGroupResult.memberAssignment)
.setThrottleTimeMs(requestThrottleMs)
))
@ -1475,9 +1466,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(SyncGroupResult(Array[Byte](), Errors.UNSUPPORTED_VERSION))
sendResponseCallback(SyncGroupResult(Errors.UNSUPPORTED_VERSION))
} else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) {
// Starting from version 5, ProtocolType and ProtocolName fields are mandatory.
sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (!authorize(request, READ, GROUP, syncGroupRequest.data.groupId)) {
sendResponseCallback(SyncGroupResult(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED))
sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
syncGroupRequest.data.assignments.asScala.foreach { assignment =>
@ -1488,6 +1482,8 @@ class KafkaApis(val requestChannel: RequestChannel,
syncGroupRequest.data.groupId,
syncGroupRequest.data.generationId,
syncGroupRequest.data.memberId,
Option(syncGroupRequest.data.protocolType),
Option(syncGroupRequest.data.protocolName),
Option(syncGroupRequest.data.groupInstanceId),
assignmentMap.result,
sendResponseCallback

View File

@ -81,6 +81,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val logDir = "logDir"
val deleteRecordsPartition = new TopicPartition(deleteTopic, part)
val group = "my-group"
val protocolType = "consumer"
val protocolName = "consumer-range"
val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL)
val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
val groupResource = new ResourcePattern(GROUP, group, LITERAL)
@ -324,7 +326,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createJoinGroupRequest = {
val protocolSet = new JoinGroupRequestProtocolCollection(
Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("consumer-range")
.setName(protocolName)
.setMetadata("test".getBytes())
).iterator())
@ -334,7 +336,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setSessionTimeoutMs(10000)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGroupInstanceId(null)
.setProtocolType("consumer")
.setProtocolType(protocolType)
.setProtocols(protocolSet)
.setRebalanceTimeoutMs(60000)
).build()
@ -346,6 +348,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setGroupId(group)
.setGenerationId(1)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setProtocolType(protocolType)
.setProtocolName(protocolName)
.setAssignments(Collections.emptyList())
).build()
}

View File

@ -42,8 +42,9 @@ import scala.concurrent.{Await, Future, Promise, TimeoutException}
class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest[GroupMember] {
private val protocolType = "consumer"
private val protocolName = "range"
private val metadata = Array[Byte]()
private val protocols = List(("range", metadata))
private val protocols = List((protocolName, metadata))
private val nGroups = nThreads * 10
private val nMembersPerGroup = nThreads * 5
@ -154,7 +155,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
}
class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, JoinGroupCallback] {
override def responseCallback(responsePromise: Promise[JoinGroupCallbackParams]): JoinGroupCallback = {
val callback: JoinGroupCallback = responsePromise.success(_)
@ -182,10 +182,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def runWithCallback(member: GroupMember, responseCallback: SyncGroupCallback): Unit = {
if (member.leader) {
groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
member.groupInstanceId, member.group.assignment, responseCallback)
Some(protocolType), Some(protocolName), member.groupInstanceId, member.group.assignment, responseCallback)
} else {
groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
member.groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
Some(protocolType), Some(protocolName), member.groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
}
}
override def awaitAndVerify(member: GroupMember): Unit = {
@ -208,6 +208,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
assertEquals(Errors.NONE, error)
}
}
class CommitOffsetsOperation extends GroupOperation[CommitOffsetCallbackParams, CommitOffsetCallback] {
override def responseCallback(responsePromise: Promise[CommitOffsetCallbackParams]): CommitOffsetCallback = {
val callback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
@ -313,7 +314,7 @@ object GroupCoordinatorConcurrencyTest {
val members = (0 until nMembers).map { i =>
new GroupMember(this, groupPartitionId, i == 0)
}
def assignment = members.map { m => (m.memberId, Array[Byte]()) }.toMap
def assignment: Map[String, Array[Byte]] = members.map { m => (m.memberId, Array[Byte]()) }.toMap
}
class GroupMember(val group: Group, val groupPartitionId: Int, val leader: Boolean) extends CoordinatorMember {

View File

@ -54,7 +54,6 @@ class GroupCoordinatorTest {
import GroupCoordinatorTest._
type JoinGroupCallback = JoinGroupResult => Unit
type SyncGroupCallbackParams = (Array[Byte], Errors)
type SyncGroupCallback = SyncGroupResult => Unit
type HeartbeatCallbackParams = Errors
type HeartbeatCallback = Errors => Unit
@ -78,14 +77,15 @@ class GroupCoordinatorTest {
private val groupId = "groupId"
private val protocolType = "consumer"
private val protocolName = "range"
private val memberId = "memberId"
private val groupInstanceId = Some("groupInstanceId")
private val leaderInstanceId = Some("leader")
private val followerInstanceId = Some("follower")
private val invalidMemberId = "invalidMember"
private val metadata = Array[Byte]()
private val protocols = List(("range", metadata))
private val protocolSuperset = List(("range", metadata), ("roundrobin", metadata))
private val protocols = List((protocolName, metadata))
private val protocolSuperset = List((protocolName, metadata), ("roundrobin", metadata))
private val requireStable = true
private var groupPartitionId: Int = -1
@ -153,7 +153,7 @@ class GroupCoordinatorTest {
// SyncGroup
var syncGroupResponse: Option[Errors] = None
groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, None, Map.empty[String, Array[Byte]],
groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, Some("consumer"), Some("range"), None, Map.empty[String, Array[Byte]],
syncGroupResult => syncGroupResponse = Some(syncGroupResult.error))
assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse)
@ -241,8 +241,7 @@ class GroupCoordinatorTest {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupResult.error)
}
@Test
@ -250,8 +249,7 @@ class GroupCoordinatorTest {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupResult.error)
}
@Test
@ -278,8 +276,7 @@ class GroupCoordinatorTest {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
assertEquals(Errors.NONE, joinGroupResult.error)
}
@Test
@ -331,9 +328,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId, memberId, Map(memberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
assertEquals(1, group.size)
timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
@ -389,7 +384,6 @@ class GroupCoordinatorTest {
@Test
def testJoinGroupInconsistentGroupProtocol(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata)))
@ -419,10 +413,9 @@ class GroupCoordinatorTest {
@Test
def testJoinGroupUnknownConsumerNewDeadGroup(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val deadGroupId = "deadGroupId"
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
}
@ -430,12 +423,11 @@ class GroupCoordinatorTest {
@Test
def testSyncDeadGroup(): Unit = {
val memberId = "memberId"
val deadGroupId = "deadGroupId"
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
val syncGroupResult = syncGroupFollower(deadGroupId, 1, memberId)
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult._2)
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult.error)
}
@Test
@ -505,7 +497,8 @@ class GroupCoordinatorTest {
-1,
Set.empty,
groupId,
PreparingRebalance)
PreparingRebalance,
None)
verifyDelayedTaskNotCompleted(duplicateFollowerJoinFuture)
}
@ -532,7 +525,8 @@ class GroupCoordinatorTest {
rebalanceResult.generation + 1,
Set(leaderInstanceId, followerInstanceId),
groupId,
CompletingRebalance)
CompletingRebalance,
Some(protocolType))
assertEquals(leaderJoinGroupResult.leaderId, leaderJoinGroupResult.memberId)
assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId)
@ -544,11 +538,12 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
CompletingRebalance,
Some(protocolType),
expectedLeaderId = leaderJoinGroupResult.memberId)
EasyMock.reset(replicaManager)
val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId,
oldFollowerJoinGroupResult.memberId, followerInstanceId)
oldFollowerJoinGroupResult.memberId, Some(protocolType), Some(protocolName), followerInstanceId)
// Duplicate follower joins group with unknown member id will trigger member.id replacement.
EasyMock.reset(replicaManager)
@ -558,7 +553,7 @@ class GroupCoordinatorTest {
// Old follower sync callback will return fenced exception while broker replaces the member identity.
val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
assertEquals(oldFollowerSyncGroupResult._2, Errors.FENCED_INSTANCE_ID)
assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerSyncGroupResult.error)
// Duplicate follower will get the same response as old follower.
val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS))
@ -568,6 +563,7 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
CompletingRebalance,
Some(protocolType),
expectedLeaderId = leaderJoinGroupResult.memberId)
}
@ -598,7 +594,8 @@ class GroupCoordinatorTest {
rebalanceResult.generation + 1,
Set(leaderInstanceId, followerInstanceId),
groupId,
CompletingRebalance)
CompletingRebalance,
Some(protocolType))
val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
checkJoinGroupResult(duplicateFollowerJoinGroupResult,
@ -606,7 +603,8 @@ class GroupCoordinatorTest {
rebalanceResult.generation + 1,
Set.empty,
groupId,
CompletingRebalance)
CompletingRebalance,
Some(protocolType))
assertNotEquals(rebalanceResult.followerId, duplicateFollowerJoinGroupResult.memberId)
val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
@ -615,7 +613,8 @@ class GroupCoordinatorTest {
-1,
Set.empty,
groupId,
CompletingRebalance)
CompletingRebalance,
None)
}
@Test
@ -633,10 +632,10 @@ class GroupCoordinatorTest {
assertTrue(getGroup(groupId).is(CompletingRebalance))
EasyMock.reset(replicaManager)
val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, groupInstanceId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Some(protocolType), Some(protocolName), groupInstanceId, Map(assignedMemberId -> Array[Byte]()))
timer.advanceClock(1)
val syncGroupResult = Await.result(syncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
assertTrue(getGroup(groupId).is(Stable))
}
@ -653,6 +652,7 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
Stable,
Some(protocolType),
rebalanceResult.leaderId)
EasyMock.reset(replicaManager)
@ -661,13 +661,13 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
// Old leader will get fenced.
val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty, leaderInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderSyncGroupResult._2)
val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty, None, None, leaderInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderSyncGroupResult.error)
// Calling sync on old leader.id will fail because that leader.id is no longer valid and replaced.
EasyMock.reset(replicaManager)
val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.UNKNOWN_MEMBER_ID, newLeaderSyncGroupResult._2)
assertEquals(Errors.UNKNOWN_MEMBER_ID, newLeaderSyncGroupResult.error)
}
@Test
@ -684,6 +684,7 @@ class GroupCoordinatorTest {
Set(leaderInstanceId),
groupId,
CompletingRebalance,
Some(protocolType),
rebalanceResult.leaderId,
rebalanceResult.leaderId)
}
@ -727,6 +728,7 @@ class GroupCoordinatorTest {
Set(leaderInstanceId, followerInstanceId),
groupId,
CompletingRebalance,
Some(protocolType),
rebalanceResult.followerId,
rebalanceResult.followerId)
}
@ -745,7 +747,8 @@ class GroupCoordinatorTest {
rebalanceResult.generation,
Set.empty,
groupId,
Stable)
Stable,
Some(protocolType))
EasyMock.reset(replicaManager)
// Join with old member id will fail because the member id is updated
@ -755,13 +758,13 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
// Sync with old member id will fail because the member id is updated
val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId, followerInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupWithOldMemberIdResult._2)
val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId, None, None, followerInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupWithOldMemberIdResult.error)
EasyMock.reset(replicaManager)
val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId, followerInstanceId)
assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult._2)
assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult._1)
val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId, None, None, followerInstanceId)
assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult.error)
assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult.memberAssignment)
}
@Test
@ -783,6 +786,7 @@ class GroupCoordinatorTest {
Set(leaderInstanceId, followerInstanceId),
groupId,
CompletingRebalance,
Some(protocolType),
rebalanceResult.leaderId,
rebalanceResult.leaderId)
@ -792,6 +796,7 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
CompletingRebalance,
Some(protocolType),
rebalanceResult.leaderId,
rebalanceResult.followerId)
@ -808,6 +813,7 @@ class GroupCoordinatorTest {
Set(followerInstanceId),
groupId,
CompletingRebalance,
Some(protocolType),
rebalanceResult.followerId,
rebalanceResult.followerId)
}
@ -826,14 +832,15 @@ class GroupCoordinatorTest {
rebalanceResult.generation, // The group has no change.
Set.empty,
groupId,
Stable)
Stable,
Some(protocolType))
assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(rebalanceResult.followerAssignment, syncGroupResult._1)
assertEquals(Errors.NONE, syncGroupResult.error)
assertEquals(rebalanceResult.followerAssignment, syncGroupResult.memberAssignment)
}
@Test
@ -851,6 +858,7 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
Stable,
Some(protocolType),
rebalanceResult.leaderId,
rebalanceResult.followerId)
}
@ -875,8 +883,8 @@ class GroupCoordinatorTest {
def staticMemberSyncAsLeaderWithInvalidMemberId(): Unit = {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, "invalid", Map.empty, leaderInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupResult._2)
val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, "invalid", Map.empty, None, None, leaderInstanceId)
assertEquals(Errors.FENCED_INSTANCE_ID, syncGroupResult.error)
}
@Test
@ -884,7 +892,7 @@ class GroupCoordinatorTest {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val validHeartbeatResult = heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation)
@ -930,7 +938,7 @@ class GroupCoordinatorTest {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
val tp = new TopicPartition("topic", 0)
val offset = offsetAndMetadata(0)
@ -1048,7 +1056,8 @@ class GroupCoordinatorTest {
3,
Set(leaderInstanceId),
groupId,
CompletingRebalance
CompletingRebalance,
Some(protocolType)
)
assertEquals(1, getGroup(groupId).allMembers.size)
assertNotEquals(null, getGroup(groupId).leaderOrNull)
@ -1104,6 +1113,7 @@ class GroupCoordinatorTest {
Set(leaderInstanceId, followerInstanceId, newMemberInstanceId),
groupId,
CompletingRebalance,
Some(protocolType),
expectedLeaderId = leaderId,
expectedMemberId = leaderId)
@ -1115,6 +1125,7 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
CompletingRebalance,
Some(protocolType),
expectedLeaderId = leaderId)
}
@ -1143,7 +1154,8 @@ class GroupCoordinatorTest {
initialRebalanceResult.generation + 1,
Set(leaderInstanceId, followerInstanceId, newMemberInstanceId),
groupId,
CompletingRebalance)
CompletingRebalance,
Some(protocolType))
checkJoinGroupResult(newFollowerResult,
Errors.NONE,
@ -1151,9 +1163,114 @@ class GroupCoordinatorTest {
Set.empty,
groupId,
CompletingRebalance,
Some(protocolType),
expectedLeaderId = newLeaderResult.memberId)
}
@Test
def testJoinGroupProtocolTypeIsNotProvidedWhenAnErrorOccurs(): Unit = {
// JoinGroup(leader)
EasyMock.reset(replicaManager)
val leaderResponseFuture = sendJoinGroup(groupId, "fake-id", protocolType,
protocolSuperset, leaderInstanceId, DefaultSessionTimeout)
// The Protocol Type is None when there is an error
val leaderJoinGroupResult = await(leaderResponseFuture, 1)
assertEquals(Errors.UNKNOWN_MEMBER_ID, leaderJoinGroupResult.error)
assertEquals(None, leaderJoinGroupResult.protocolType)
}
@Test
def testJoinGroupReturnsTheProtocolType(): Unit = {
// JoinGroup(leader)
EasyMock.reset(replicaManager)
val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocolSuperset, leaderInstanceId, DefaultSessionTimeout)
// JoinGroup(follower)
EasyMock.reset(replicaManager)
val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocolSuperset, followerInstanceId, DefaultSessionTimeout)
timer.advanceClock(GroupInitialRebalanceDelay + 1)
timer.advanceClock(DefaultRebalanceTimeout + 1)
// The Protocol Type is Defined when there is not error
val leaderJoinGroupResult = await(leaderResponseFuture, 1)
assertEquals(Errors.NONE, leaderJoinGroupResult.error)
assertEquals(protocolType, leaderJoinGroupResult.protocolType.orNull)
// The Protocol Type is Defined when there is not error
val followerJoinGroupResult = await(followerResponseFuture, 1)
assertEquals(Errors.NONE, followerJoinGroupResult.error)
assertEquals(protocolType, followerJoinGroupResult.protocolType.orNull)
}
@Test
def testSyncGroupReturnsAnErrorWhenProtocolTypeIsInconsistent(): Unit = {
testSyncGroupProtocolTypeAndNameWith(Some("whatever"), None, Errors.INCONSISTENT_GROUP_PROTOCOL,
None, None)
}
@Test
def testSyncGroupReturnsAnErrorWhenProtocolNameIsInconsistent(): Unit = {
testSyncGroupProtocolTypeAndNameWith(None, Some("whatever"), Errors.INCONSISTENT_GROUP_PROTOCOL,
None, None)
}
@Test
def testSyncGroupSucceedWhenProtocolTypeAndNameAreNotProvided(): Unit = {
testSyncGroupProtocolTypeAndNameWith(None, None, Errors.NONE,
Some(protocolType), Some(protocolName))
}
@Test
def testSyncGroupSucceedWhenProtocolTypeAndNameAreConsistent(): Unit = {
testSyncGroupProtocolTypeAndNameWith(Some(protocolType), Some(protocolName),
Errors.NONE, Some(protocolType), Some(protocolName))
}
private def testSyncGroupProtocolTypeAndNameWith(protocolType: Option[String],
protocolName: Option[String],
expectedError: Errors,
expectedProtocolType: Option[String],
expectedProtocolName: Option[String]): Unit = {
// JoinGroup(leader) with the Protocol Type of the group
EasyMock.reset(replicaManager)
val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, this.protocolType,
protocolSuperset, leaderInstanceId, DefaultSessionTimeout)
// JoinGroup(follower) with the Protocol Type of the group
EasyMock.reset(replicaManager)
val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, this.protocolType,
protocolSuperset, followerInstanceId, DefaultSessionTimeout)
timer.advanceClock(GroupInitialRebalanceDelay + 1)
timer.advanceClock(DefaultRebalanceTimeout + 1)
val leaderJoinGroupResult = await(leaderResponseFuture, 1)
val leaderId = leaderJoinGroupResult.memberId
val generationId = leaderJoinGroupResult.generationId
val followerJoinGroupResult = await(followerResponseFuture, 1)
val followerId = followerJoinGroupResult.memberId
// SyncGroup with the provided Protocol Type and Name
EasyMock.reset(replicaManager)
val leaderSyncGroupResult = syncGroupLeader(groupId, generationId, leaderId,
Map(leaderId -> Array.empty), protocolType, protocolName)
assertEquals(expectedError, leaderSyncGroupResult.error)
assertEquals(expectedProtocolType, leaderSyncGroupResult.protocolType)
assertEquals(expectedProtocolName, leaderSyncGroupResult.protocolName)
// SyncGroup with the provided Protocol Type and Name
EasyMock.reset(replicaManager)
val followerSyncGroupResult = syncGroupFollower(groupId, generationId, followerId,
protocolType, protocolName)
assertEquals(expectedError, followerSyncGroupResult.error)
assertEquals(expectedProtocolType, followerSyncGroupResult.protocolType)
assertEquals(expectedProtocolName, followerSyncGroupResult.protocolName)
}
private class RebalanceResult(val generation: Int,
val leaderId: String,
val leaderAssignment: Array[Byte],
@ -1193,21 +1310,21 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val leaderId = leaderJoinGroupResult.memberId
val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]()))
assertEquals(Errors.NONE, leaderSyncGroupResult._2)
assertEquals(Errors.NONE, leaderSyncGroupResult.error)
assertTrue(getGroup(groupId).is(Stable))
EasyMock.reset(replicaManager)
val followerId = followerJoinGroupResult.memberId
val followerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId)
assertEquals(Errors.NONE, followerSyncGroupResult._2)
assertEquals(Errors.NONE, followerSyncGroupResult.error)
assertTrue(getGroup(groupId).is(Stable))
EasyMock.reset(replicaManager)
new RebalanceResult(newGeneration,
leaderId,
leaderSyncGroupResult._1,
leaderSyncGroupResult.memberAssignment,
followerId,
followerSyncGroupResult._1)
followerSyncGroupResult.memberAssignment)
}
private def checkJoinGroupResult(joinGroupResult: JoinGroupResult,
@ -1216,6 +1333,7 @@ class GroupCoordinatorTest {
expectedGroupInstanceIds: Set[Option[String]],
groupId: String,
expectedGroupState: GroupState,
expectedProtocolType: Option[String],
expectedLeaderId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID,
expectedMemberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID): Unit = {
assertEquals(expectedError, joinGroupResult.error)
@ -1224,6 +1342,7 @@ class GroupCoordinatorTest {
val resultedGroupInstanceIds = joinGroupResult.members.map(member => Some(member.groupInstanceId())).toSet
assertEquals(expectedGroupInstanceIds, resultedGroupInstanceIds)
assertGroupState(groupState = expectedGroupState)
assertEquals(expectedProtocolType, joinGroupResult.protocolType)
if (!expectedLeaderId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) {
assertEquals(expectedLeaderId, joinGroupResult.leaderId)
@ -1283,8 +1402,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, otherMemberId, 1)
@ -1316,8 +1434,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
@ -1336,8 +1453,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
@ -1355,8 +1471,8 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupError)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
@ -1384,8 +1500,8 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupError)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
timer.advanceClock(sessionTimeout / 2)
@ -1415,8 +1531,8 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupError)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
timer.advanceClock(sessionTimeout / 2)
@ -1443,7 +1559,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
// now have a new member join to trigger a rebalance
EasyMock.reset(replicaManager)
@ -1479,7 +1595,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
// now have a new member join to trigger a rebalance
EasyMock.reset(replicaManager)
@ -1502,7 +1618,7 @@ class GroupCoordinatorTest {
val otherGenerationId = otherJoinResult.generationId
EasyMock.reset(replicaManager)
val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncResult._2)
assertEquals(Errors.NONE, syncResult.error)
// the unjoined static member should be remained in the group before session timeout.
assertEquals(Errors.NONE, otherJoinResult.error)
@ -1528,7 +1644,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val otherRejoinGenerationId = otherReJoinResult.generationId
val reSyncResult = syncGroupLeader(groupId, otherRejoinGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, reSyncResult._2)
assertEquals(Errors.NONE, reSyncResult.error)
// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while
// to verify that no new rebalance is triggered unexpectedly
@ -1552,9 +1668,8 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertTrue(syncGroupResult._1.isEmpty)
assertEquals(Errors.NONE, syncGroupResult.error)
assertTrue(syncGroupResult.memberAssignment.isEmpty)
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
@ -1566,13 +1681,13 @@ class GroupCoordinatorTest {
val generation = 1
val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
assertEquals(Errors.NOT_COORDINATOR, syncGroupResult._2)
assertEquals(Errors.NOT_COORDINATOR, syncGroupResult.error)
}
@Test
def testSyncGroupFromUnknownGroup(): Unit = {
val syncGroupResult = syncGroupFollower(groupId, 1, memberId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult.error)
}
@Test
@ -1586,13 +1701,13 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
val syncGroupError = syncGroupResult.error
assertEquals(Errors.NONE, syncGroupError)
EasyMock.reset(replicaManager)
val unknownMemberId = "blah"
val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult._2)
assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult.error)
}
@Test
@ -1607,7 +1722,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
// send the sync group with an invalid generation
val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult._2)
assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult.error)
}
@Test
@ -1624,7 +1739,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
EasyMock.reset(replicaManager)
val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@ -1661,7 +1776,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
// join groups from the leader should force the group to rebalance, which allows the
// leader to push new assignments when local metadata changes
@ -1690,7 +1805,7 @@ class GroupCoordinatorTest {
//Starting sync group leader
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
timer.advanceClock(100)
assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.allMembers.size)
assertEquals(0, groupCoordinator.groupManager.getGroup(groupId).get.numPending)
@ -1743,7 +1858,7 @@ class GroupCoordinatorTest {
// now the group is stable, with the one member that joined above
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, joinResult1.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
assertGroupState(groupState = Stable)
// start the join for the second member
@ -1760,7 +1875,7 @@ class GroupCoordinatorTest {
// stabilize the group
EasyMock.reset(replicaManager)
val secondSyncResult = syncGroupLeader(groupId, firstMemberJoinResult.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]()))
assertEquals(Errors.NONE, secondSyncResult._2)
assertEquals(Errors.NONE, secondSyncResult.error)
assertGroupState(groupState = Stable)
// re-join an existing member, to transition the group to PreparingRebalance state.
@ -1874,7 +1989,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
EasyMock.reset(replicaManager)
val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@ -1896,12 +2011,12 @@ class GroupCoordinatorTest {
// with no leader SyncGroup, the follower's request should fail with an error indicating
// that it should rejoin
EasyMock.reset(replicaManager)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, None)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, None, None, None)
timer.advanceClock(DefaultSessionTimeout + 100)
val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult._2)
assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult.error)
}
@Test
@ -1918,7 +2033,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertEquals(Errors.NONE, firstSyncResult.error)
EasyMock.reset(replicaManager)
val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@ -1944,13 +2059,13 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
assertEquals(Errors.NONE, leaderSyncResult._2)
assertEquals(leaderAssignment, leaderSyncResult._1)
assertEquals(Errors.NONE, leaderSyncResult.error)
assertEquals(leaderAssignment, leaderSyncResult.memberAssignment)
EasyMock.reset(replicaManager)
val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
assertEquals(Errors.NONE, followerSyncResult._2)
assertEquals(followerAssignment, followerSyncResult._1)
assertEquals(Errors.NONE, followerSyncResult.error)
assertEquals(followerAssignment, followerSyncResult.memberAssignment)
}
@Test
@ -1967,8 +2082,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@ -1992,17 +2106,17 @@ class GroupCoordinatorTest {
assertEquals(firstMemberId, otherJoinResult.leaderId)
EasyMock.reset(replicaManager)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, None)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, None, None, None)
EasyMock.reset(replicaManager)
val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
assertEquals(Errors.NONE, leaderSyncResult._2)
assertEquals(leaderAssignment, leaderSyncResult._1)
assertEquals(Errors.NONE, leaderSyncResult.error)
assertEquals(leaderAssignment, leaderSyncResult.memberAssignment)
val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
assertEquals(Errors.NONE, followerSyncResult._2)
assertEquals(followerAssignment, followerSyncResult._1)
assertEquals(Errors.NONE, followerSyncResult.error)
assertEquals(followerAssignment, followerSyncResult.memberAssignment)
}
@Test
@ -2606,8 +2720,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, protocols)
@ -2794,8 +2907,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
val (error, groups) = groupCoordinator.handleListGroups()
assertEquals(Errors.NONE, error)
@ -2840,9 +2952,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@ -2862,9 +2972,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@ -2948,8 +3056,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
assertEquals(Errors.NONE, syncGroupResult.error)
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
@ -3010,7 +3117,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
val t1p0 = new TopicPartition("foo", 0)
val t2p0 = new TopicPartition("bar", 0)
@ -3057,7 +3164,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
val tp = new TopicPartition("foo", 0)
val offset = offsetAndMetadata(37)
@ -3096,7 +3203,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
val t1p0 = new TopicPartition("foo", 0)
val t2p0 = new TopicPartition("bar", 0)
@ -3148,7 +3255,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult._2)
assertEquals(Errors.NONE, syncGroupResult.error)
val t1p0 = new TopicPartition("foo", 0)
val t2p0 = new TopicPartition("bar", 0)
@ -3282,18 +3389,18 @@ class GroupCoordinatorTest {
assertTrue(groupOpt.isDefined)
groupOpt.get
}
private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
val responsePromise = Promise[JoinGroupResult]
val responseFuture = responsePromise.future
val responseCallback: JoinGroupCallback = responsePromise.success(_)
val responseCallback: JoinGroupCallback = responsePromise.success
(responseFuture, responseCallback)
}
private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = {
val responsePromise = Promise[SyncGroupCallbackParams]
private def setupSyncGroupCallback: (Future[SyncGroupResult], SyncGroupCallback) = {
val responsePromise = Promise[SyncGroupResult]
val responseFuture = responsePromise.future
val responseCallback: SyncGroupCallback = syncGroupResult =>
responsePromise.success(syncGroupResult.memberAssignment, syncGroupResult.error)
val responseCallback: SyncGroupCallback = responsePromise.success
(responseFuture, responseCallback)
}
@ -3335,12 +3442,13 @@ class GroupCoordinatorTest {
responseFuture
}
private def sendSyncGroupLeader(groupId: String,
generation: Int,
leaderId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
assignment: Map[String, Array[Byte]]): Future[SyncGroupResult] = {
val (responseFuture, responseCallback) = setupSyncGroupCallback
val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
@ -3361,19 +3469,22 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, groupInstanceId, assignment, responseCallback)
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, protocolType, protocolName,
groupInstanceId, assignment, responseCallback)
responseFuture
}
private def sendSyncGroupFollower(groupId: String,
generation: Int,
memberId: String,
groupInstanceId: Option[String]): Future[SyncGroupCallbackParams] = {
prototolType: Option[String],
prototolName: Option[String],
groupInstanceId: Option[String]): Future[SyncGroupResult] = {
val (responseFuture, responseCallback) = setupSyncGroupCallback
EasyMock.replay(replicaManager)
groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
groupCoordinator.handleSyncGroup(groupId, generation, memberId, prototolType, prototolName, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
responseFuture
}
@ -3419,9 +3530,12 @@ class GroupCoordinatorTest {
private def syncGroupFollower(groupId: String,
generationId: Int,
memberId: String,
protocolType: Option[String] = None,
protocolName: Option[String] = None,
groupInstanceId: Option[String] = None,
sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, groupInstanceId)
sessionTimeout: Int = DefaultSessionTimeout): SyncGroupResult = {
val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, protocolType,
protocolName, groupInstanceId)
Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
}
@ -3429,9 +3543,12 @@ class GroupCoordinatorTest {
generationId: Int,
memberId: String,
assignment: Map[String, Array[Byte]],
protocolType: Option[String] = None,
protocolName: Option[String] = None,
groupInstanceId: Option[String] = None,
sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, groupInstanceId, assignment)
sessionTimeout: Int = DefaultSessionTimeout): SyncGroupResult = {
val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, protocolType,
protocolName, groupInstanceId, assignment)
Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
}

View File

@ -157,7 +157,7 @@ class GroupMetadataManagerTest {
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.leaderOrNull)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
@ -634,7 +634,7 @@ class GroupMetadataManagerTest {
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
@ -863,7 +863,7 @@ class GroupMetadataManagerTest {
assertTrue(group.is(Stable))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Some(Set(topic)), group.getSubscribedTopics)
assertTrue(group.has(memberId))
}
@ -879,7 +879,7 @@ class GroupMetadataManagerTest {
assertTrue(group.is(Empty))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
assertEquals(Some(Set.empty), group.getSubscribedTopics)
}
@ -903,7 +903,7 @@ class GroupMetadataManagerTest {
assertTrue(group.is(Stable))
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(None, group.getSubscribedTopics)
assertTrue(group.has(memberId))
}
@ -922,7 +922,7 @@ class GroupMetadataManagerTest {
assertEquals(groupId, deserializedGroupMetadata.groupId)
assertEquals(generation, deserializedGroupMetadata.generationId)
assertEquals(protocolType, deserializedGroupMetadata.protocolType.get)
assertEquals(protocol, deserializedGroupMetadata.protocolOrNull)
assertEquals(protocol, deserializedGroupMetadata.protocolName.orNull)
assertEquals(1, deserializedGroupMetadata.allMembers.size)
assertTrue(deserializedGroupMetadata.allMembers.contains(memberId))
assertTrue(deserializedGroupMetadata.allStaticMembers.isEmpty)
@ -1945,7 +1945,7 @@ class GroupMetadataManagerTest {
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
@ -1985,7 +1985,7 @@ class GroupMetadataManagerTest {
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(protocol, group.protocolName.orNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
@ -2137,7 +2137,7 @@ class GroupMetadataManagerTest {
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertNull(group.leaderOrNull)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}

View File

@ -325,25 +325,25 @@ class GroupMetadataTest {
group.add(member, _ => ())
assertEquals(0, group.generationId)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
group.initNextGeneration()
assertEquals(1, group.generationId)
assertEquals("roundrobin", group.protocolOrNull)
assertEquals("roundrobin", group.protocolName.orNull)
}
@Test
def testInitNextGenerationEmptyGroup(): Unit = {
assertEquals(Empty, group.currentState)
assertEquals(0, group.generationId)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
assertEquals(1, group.generationId)
assertNull(group.protocolOrNull)
assertNull(group.protocolName.orNull)
}
@Test
@ -568,7 +568,7 @@ class GroupMetadataTest {
})
assertTrue(group.hasAllMembersJoined)
group.maybeInvokeJoinCallback(member, GroupCoordinator.joinError(member.memberId, Errors.NONE))
group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NONE))
assertTrue(invoked)
assertFalse(member.isAwaitingJoin)
}
@ -578,7 +578,7 @@ class GroupMetadataTest {
group.add(member)
assertFalse(member.isAwaitingJoin)
group.maybeInvokeJoinCallback(member, GroupCoordinator.joinError(member.memberId, Errors.NONE))
group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NONE))
assertFalse(member.isAwaitingJoin)
}
@ -587,7 +587,7 @@ class GroupMetadataTest {
group.add(member)
member.awaitingSyncCallback = _ => {}
val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NONE))
val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NONE))
assertTrue(invoked)
assertFalse(member.isAwaitingSync)
}
@ -596,7 +596,7 @@ class GroupMetadataTest {
def testNotInvokeSyncCallback(): Unit = {
group.add(member)
val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NONE))
val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NONE))
assertFalse(invoked)
assertFalse(member.isAwaitingSync)
}

View File

@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
import kafka.cluster.Partition
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
import kafka.coordinator.group.JoinGroupResult
import kafka.coordinator.group.SyncGroupResult
import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.AppendOrigin
@ -88,6 +92,7 @@ class KafkaApisTest {
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
private val time = new MockTime
private val clientId = ""
@After
def tearDown(): Unit = {
@ -129,7 +134,7 @@ class KafkaApisTest {
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder(
val offsetCommitRequest = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("groupId")
.setTopics(Collections.singletonList(
@ -142,9 +147,9 @@ class KafkaApisTest {
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
.setCommittedMetadata(""))
)
))
))
))).build()
val request = buildRequest(offsetCommitRequest)
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleOffsetCommitRequest(request)
@ -169,12 +174,14 @@ class KafkaApisTest {
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder(
val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
"txnId",
"groupId",
15L,
0.toShort,
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava
).build()
val request = buildRequest(offsetCommitRequest)
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@ -198,9 +205,10 @@ class KafkaApisTest {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val (addPartitionsToTxnRequest, request) = buildRequest(new AddPartitionsToTxnRequest.Builder(
"txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava))
val addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(
"txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava
).build()
val request = buildRequest(addPartitionsToTxnRequest)
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@ -323,13 +331,15 @@ class KafkaApisTest {
val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val txnStatePartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0)
val (_, request) = buildRequest(new StopReplicaRequest.Builder(
val stopReplicaRequest = new StopReplicaRequest.Builder(
ApiKeys.STOP_REPLICA.latestVersion,
controllerId,
controllerEpoch,
brokerEpoch,
true,
Set(groupMetadataPartition, txnStatePartition).asJava))
Set(groupMetadataPartition, txnStatePartition).asJava
).build()
val request = buildRequest(stopReplicaRequest)
EasyMock.expect(replicaManager.stopReplicas(anyObject())).andReturn(
(mutable.Map(groupMetadataPartition -> Errors.NONE, txnStatePartition -> Errors.NONE), Errors.NONE))
@ -440,9 +450,10 @@ class KafkaApisTest {
EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val (describeGroupsRequest, request) = buildRequest(new DescribeGroupsRequest.Builder(
val describeGroupsRequest = new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
))
).build()
val request = buildRequest(describeGroupsRequest)
val capturedResponse = expectNoThrottling()
EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
@ -485,11 +496,12 @@ class KafkaApisTest {
.setName(topic)
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestPartition().setPartitionIndex(invalidPartitionId))))
val (offsetDeleteRequest, request) = buildRequest(new OffsetDeleteRequest.Builder(
val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
new OffsetDeleteRequestData()
.setGroupId(group)
.setTopics(topics)
))
).build()
val request = buildRequest(offsetDeleteRequest)
val capturedResponse = expectNoThrottling()
EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
@ -515,10 +527,11 @@ class KafkaApisTest {
EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val (offsetDeleteRequest, request) = buildRequest(new OffsetDeleteRequest.Builder(
val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
new OffsetDeleteRequestData()
.setGroupId(group)
))
).build()
val request = buildRequest(offsetDeleteRequest)
val capturedResponse = expectNoThrottling()
EasyMock.expect(groupCoordinator.handleDeleteOffsets(EasyMock.eq(group), EasyMock.eq(Seq.empty)))
@ -551,9 +564,9 @@ class KafkaApisTest {
val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP,
currentLeaderEpoch))
val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava)
val (listOffsetRequest, request) = buildRequest(builder)
val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava).build()
val request = buildRequest(listOffsetRequest)
createKafkaApis().handleListOffsetRequest(request)
val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse)
@ -641,8 +654,9 @@ class KafkaApisTest {
EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
val builder = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData)
val (fetchRequest, request) = buildRequest(builder)
val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData)
.build()
val request = buildRequest(fetchRequest)
createKafkaApis().handleFetchRequest(request)
val response = readResponse(ApiKeys.FETCH, fetchRequest, capturedResponse)
@ -662,36 +676,347 @@ class KafkaApisTest {
@Test
def testJoinGroupProtocolsOrder(): Unit = {
val protocols = List(
new JoinGroupRequestProtocol().setName("first").setMetadata("first".getBytes()),
new JoinGroupRequestProtocol().setName("second").setMetadata("second".getBytes())
("first", "first".getBytes()),
("second", "second".getBytes())
)
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val capturedProtocols = EasyMock.newCapture[List[(String, Array[Byte])]]()
EasyMock.expect(groupCoordinator.handleJoinGroup(
anyString,
anyString,
anyObject(classOf[Option[String]]),
anyBoolean,
anyString,
anyString,
anyInt,
anyInt,
anyString,
EasyMock.eq(protocols.map(protocol => (protocol.name, protocol.metadata))),
EasyMock.eq(groupId),
EasyMock.eq(memberId),
EasyMock.eq(None),
EasyMock.eq(true),
EasyMock.eq(clientId),
EasyMock.eq(InetAddress.getLocalHost.toString),
EasyMock.eq(rebalanceTimeoutMs),
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.capture(capturedProtocols),
anyObject()
))
EasyMock.replay(groupCoordinator)
createKafkaApis().handleJoinGroupRequest(
buildRequest(
new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId("test")
.setMemberId("test")
.setProtocolType("consumer")
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(protocols.iterator.asJava))
)
)._2)
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
.setName(name).setMetadata(protocol)
}.iterator.asJava))
).build()
))
EasyMock.replay(groupCoordinator)
EasyMock.verify(groupCoordinator)
val capturedProtocolsList = capturedProtocols.getValue
assertEquals(protocols.size, capturedProtocolsList.size)
protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
assertEquals(expectedName, name)
assertArrayEquals(expectedBytes, bytes)
}
}
@Test
def testJoinGroupWhenAnErrorOccurs(): Unit = {
for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
}
}
def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
val capturedResponse = expectNoThrottling()
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
EasyMock.expect(groupCoordinator.handleJoinGroup(
EasyMock.eq(groupId),
EasyMock.eq(memberId),
EasyMock.eq(None),
EasyMock.eq(if (version >= 4) true else false),
EasyMock.eq(clientId),
EasyMock.eq(InetAddress.getLocalHost.toString),
EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.eq(List.empty),
EasyMock.capture(capturedCallback)
))
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
EasyMock.verify(groupCoordinator)
capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
val response = readResponse(ApiKeys.JOIN_GROUP, joinGroupRequest, capturedResponse)
.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
assertEquals(0, response.data.members.size)
assertEquals(memberId, response.data.memberId)
assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
assertEquals(GroupCoordinator.NoLeader, response.data.leader)
assertNull(response.data.protocolType)
if (version >= 7) {
assertNull(response.data.protocolName)
} else {
assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
}
EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
def testJoinGroupProtocolType(): Unit = {
for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
testJoinGroupProtocolType(version.asInstanceOf[Short])
}
}
def testJoinGroupProtocolType(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
val capturedResponse = expectNoThrottling()
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val capturedCallback = EasyMock.newCapture[JoinGroupCallback]()
EasyMock.expect(groupCoordinator.handleJoinGroup(
EasyMock.eq(groupId),
EasyMock.eq(memberId),
EasyMock.eq(None),
EasyMock.eq(if (version >= 4) true else false),
EasyMock.eq(clientId),
EasyMock.eq(InetAddress.getLocalHost.toString),
EasyMock.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
EasyMock.eq(sessionTimeoutMs),
EasyMock.eq(protocolType),
EasyMock.eq(List.empty),
EasyMock.capture(capturedCallback)
))
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest)
EasyMock.verify(groupCoordinator)
capturedCallback.getValue.apply(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = 0,
protocolType = Some(protocolType),
protocolName = Some(protocolName),
leaderId = memberId,
error = Errors.NONE
))
val response = readResponse(ApiKeys.JOIN_GROUP, joinGroupRequest, capturedResponse)
.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.NONE, response.error)
assertEquals(0, response.data.members.size)
assertEquals(memberId, response.data.memberId)
assertEquals(0, response.data.generationId)
assertEquals(memberId, response.data.leader)
assertEquals(protocolName, response.data.protocolName)
if (version >= 7) {
assertEquals(protocolType, response.data.protocolType)
} else {
assertNull(response.data.protocolType)
}
EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
def testSyncGroupProtocolTypeAndName(): Unit = {
for (version <- ApiKeys.SYNC_GROUP.oldestVersion to ApiKeys.SYNC_GROUP.latestVersion) {
testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short])
}
}
def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
val capturedResponse = expectNoThrottling()
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
EasyMock.expect(groupCoordinator.handleSyncGroup(
EasyMock.eq(groupId),
EasyMock.eq(0),
EasyMock.eq(memberId),
EasyMock.eq(if (version >= 5) Some(protocolType) else None),
EasyMock.eq(if (version >= 5) Some(protocolName) else None),
EasyMock.eq(None),
EasyMock.eq(Map.empty),
EasyMock.capture(capturedCallback)
))
val syncGroupRequest = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(groupId)
.setGenerationId(0)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setProtocolName(protocolName)
).build(version)
val requestChannelRequest = buildRequest(syncGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
EasyMock.verify(groupCoordinator)
capturedCallback.getValue.apply(SyncGroupResult(
protocolType = Some(protocolType),
protocolName = Some(protocolName),
memberAssignment = Array.empty,
error = Errors.NONE
))
val response = readResponse(ApiKeys.SYNC_GROUP, syncGroupRequest, capturedResponse)
.asInstanceOf[SyncGroupResponse]
assertEquals(Errors.NONE, response.error)
assertArrayEquals(Array.empty[Byte], response.data.assignment)
if (version >= 5) {
assertEquals(protocolType, response.data.protocolType)
} else {
assertNull(response.data.protocolType)
}
EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(): Unit = {
for (version <- ApiKeys.SYNC_GROUP.oldestVersion to ApiKeys.SYNC_GROUP.latestVersion) {
testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version.asInstanceOf[Short])
}
}
def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short): Unit = {
EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
val capturedResponse = expectNoThrottling()
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
val capturedCallback = EasyMock.newCapture[SyncGroupCallback]()
if (version < 5) {
EasyMock.expect(groupCoordinator.handleSyncGroup(
EasyMock.eq(groupId),
EasyMock.eq(0),
EasyMock.eq(memberId),
EasyMock.eq(None),
EasyMock.eq(None),
EasyMock.eq(None),
EasyMock.eq(Map.empty),
EasyMock.capture(capturedCallback)
))
}
val syncGroupRequest = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(groupId)
.setGenerationId(0)
.setMemberId(memberId)
).build(version)
val requestChannelRequest = buildRequest(syncGroupRequest)
EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleSyncGroupRequest(requestChannelRequest)
EasyMock.verify(groupCoordinator)
if (version < 5) {
capturedCallback.getValue.apply(SyncGroupResult(
protocolType = Some(protocolType),
protocolName = Some(protocolName),
memberAssignment = Array.empty,
error = Errors.NONE
))
}
val response = readResponse(ApiKeys.SYNC_GROUP, syncGroupRequest, capturedResponse)
.asInstanceOf[SyncGroupResponse]
if (version < 5) {
assertEquals(Errors.NONE, response.error)
} else {
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
}
EasyMock.verify(clientRequestQuotaManager, requestChannel)
}
@Test
@ -699,14 +1024,16 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (joinGroupRequest, requestChannelRequest) = buildRequest(new JoinGroupRequest.Builder(
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId("test")
.setMemberId("test")
.setGroupInstanceId("instanceId")
.setProtocolType("consumer")
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection)
))
).build()
val requestChannelRequest = buildRequest(joinGroupRequest)
createKafkaApis(KAFKA_2_2_IV1).handleJoinGroupRequest(requestChannelRequest)
val response = readResponse(ApiKeys.JOIN_GROUP, joinGroupRequest, capturedResponse).asInstanceOf[JoinGroupResponse]
@ -719,17 +1046,19 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (syncGroupRequest, requestChannelRequest) = buildRequest(new SyncGroupRequest.Builder(
val syncGroupRequest = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId("test")
.setMemberId("test")
.setGroupInstanceId("instanceId")
.setGenerationId(1)
))
).build()
val requestChannelRequest = buildRequest(syncGroupRequest)
createKafkaApis(KAFKA_2_2_IV1).handleSyncGroupRequest(requestChannelRequest)
val response = readResponse(ApiKeys.SYNC_GROUP, syncGroupRequest, capturedResponse).asInstanceOf[SyncGroupResponse]
assertEquals(Errors.UNSUPPORTED_VERSION, response.error())
assertEquals(Errors.UNSUPPORTED_VERSION, response.error)
EasyMock.replay(groupCoordinator)
}
@ -738,13 +1067,14 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (heartbeatRequest, requestChannelRequest) = buildRequest(new HeartbeatRequest.Builder(
val heartbeatRequest = new HeartbeatRequest.Builder(
new HeartbeatRequestData()
.setGroupId("test")
.setMemberId("test")
.setGroupInstanceId("instanceId")
.setGenerationId(1)
))
).build()
val requestChannelRequest = buildRequest(heartbeatRequest)
createKafkaApis(KAFKA_2_2_IV1).handleHeartbeatRequest(requestChannelRequest)
val response = readResponse(ApiKeys.HEARTBEAT, heartbeatRequest, capturedResponse).asInstanceOf[HeartbeatResponse]
@ -757,7 +1087,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (offsetCommitRequest, requestChannelRequest) = buildRequest(new OffsetCommitRequest.Builder(
val offsetCommitRequest = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("test")
.setMemberId("test")
@ -774,7 +1104,9 @@ class KafkaApisTest {
.setCommittedMetadata("")
))
))
))
).build()
val requestChannelRequest = buildRequest(offsetCommitRequest)
createKafkaApis(KAFKA_2_2_IV1).handleOffsetCommitRequest(requestChannelRequest)
val expectedTopicErrors = Collections.singletonList(
@ -810,10 +1142,11 @@ class KafkaApisTest {
anyObject()
))
val (_, leaveRequest) = buildRequest(
val leaveRequest = buildRequest(
new LeaveGroupRequest.Builder(
groupId,
leaveMemberList.asJava)
leaveMemberList.asJava
).build()
)
createKafkaApis().handleLeaveGroupRequest(leaveRequest)
@ -837,10 +1170,11 @@ class KafkaApisTest {
anyObject()
))
val (_, leaveRequest) = buildRequest(
val leaveRequest = buildRequest(
new LeaveGroupRequest.Builder(
groupId,
singleLeaveMember.asJava)
singleLeaveMember.asJava
).build()
)
createKafkaApis().handleLeaveGroupRequest(leaveRequest)
@ -863,8 +1197,9 @@ class KafkaApisTest {
val tp0 = new TopicPartition("tp", 0)
val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0,0, Int.MaxValue, Optional.of(leaderEpoch)))
val (_, fetchFromFollower) = buildRequest(new FetchRequest.Builder(
ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 0, fetchData))
val fetchFromFollower = buildRequest(new FetchRequest.Builder(
ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 0, fetchData
).build())
setupBasicMetadataCache(tp0.topic, numPartitions = 1)
val hw = 3
@ -913,16 +1248,19 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (initProducerIdRequest, requestChannelRequest) = buildRequest(new InitProducerIdRequest.Builder(
val initProducerIdRequest = new InitProducerIdRequest.Builder(
new InitProducerIdRequestData()
.setTransactionalId("known")
.setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt)
.setProducerId(10)
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
))
).build()
val requestChannelRequest = buildRequest(initProducerIdRequest)
createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
val response = readResponse(ApiKeys.INIT_PRODUCER_ID, initProducerIdRequest, capturedResponse).asInstanceOf[InitProducerIdResponse]
val response = readResponse(ApiKeys.INIT_PRODUCER_ID, initProducerIdRequest, capturedResponse)
.asInstanceOf[InitProducerIdResponse]
assertEquals(Errors.INVALID_REQUEST, response.error)
}
@ -931,13 +1269,14 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (initProducerIdRequest, requestChannelRequest) = buildRequest(new InitProducerIdRequest.Builder(
val initProducerIdRequest = new InitProducerIdRequest.Builder(
new InitProducerIdRequestData()
.setTransactionalId("known")
.setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt)
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setProducerEpoch(2)
))
).build()
val requestChannelRequest = buildRequest(initProducerIdRequest)
createKafkaApis(KAFKA_2_2_IV1).handleInitProducerIdRequest(requestChannelRequest)
val response = readResponse(ApiKeys.INIT_PRODUCER_ID, initProducerIdRequest, capturedResponse).asInstanceOf[InitProducerIdResponse]
@ -986,7 +1325,8 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling()
EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener)
val metadataRequest = MetadataRequest.Builder.allTopics.build()
val requestChannelRequest = buildRequest(metadataRequest, requestListener)
createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
readResponse(ApiKeys.METADATA, metadataRequest, capturedResponse).asInstanceOf[MetadataResponse]
@ -1010,9 +1350,9 @@ class KafkaApisTest {
val targetTimes = Map(tp -> new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP,
currentLeaderEpoch))
val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava)
val (listOffsetRequest, request) = buildRequest(builder)
val listOffsetRequest = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes.asJava).build()
val request = buildRequest(listOffsetRequest)
createKafkaApis().handleListOffsetRequest(request)
val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
@ -1025,23 +1365,23 @@ class KafkaApisTest {
}
private def createWriteTxnMarkersRequest(partitions: util.List[TopicPartition]) = {
val requestBuilder = new WriteTxnMarkersRequest.Builder(asList(
new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions)))
buildRequest(requestBuilder)
val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(asList(
new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions))
).build()
(writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest))
}
private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
private def buildRequest[T <: AbstractRequest](request: AbstractRequest,
listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): RequestChannel.Request = {
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
val buffer = request.serialize(new RequestHeader(request.api, request.version, clientId, 0))
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics)
}
private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
@ -1053,8 +1393,9 @@ class KafkaApisTest {
send.writeTo(channel)
channel.close()
channel.buffer.getInt() // read the size
ResponseHeader.parse(channel.buffer, api.responseHeaderVersion(request.version()))
ResponseHeader.parse(channel.buffer, api.responseHeaderVersion(request.version))
val struct = api.responseSchema(request.version).read(channel.buffer)
println(struct)
AbstractResponse.parseResponse(api, struct, request.version)
}

View File

@ -222,6 +222,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="MS_MUTABLE_ARRAY"/>
</Match>
<Match>
<!-- The code generator generates useless condition. Disable the check temporarily. -->
<Class name="org.apache.kafka.common.message.JoinGroupResponseData"/>
<Bug pattern="UC_USELESS_CONDITION"/>
</Match>
<Match>
<!-- Suppress warnings about ignoring the return value of await.
This is done intentionally because we use other clues to determine