KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Mickael Maison 2019-05-06 22:26:22 +01:00 committed by Jason Gustafson
parent 90043d5f7e
commit 407bcdf78e
21 changed files with 198 additions and 257 deletions

View File

@ -68,6 +68,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
@ -103,6 +104,7 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
@ -2538,8 +2540,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(groupId));
}
@Override
@ -2781,8 +2786,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(groupId));
}
@Override
@ -2872,8 +2880,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(groupId));
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
@ -661,7 +663,10 @@ public abstract class AbstractCoordinator implements Closeable {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
@ -679,12 +684,12 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized (AbstractCoordinator.this) {
// use MAX_VALUE - node.id as the coordinator id to allow separate connections
// for the coordinator in the underlying network client layer
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.node().host(),
findCoordinatorResponse.node().port());
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
client.tryConnect(coordinator);
heartbeat.resetSessionTimeout();
@ -693,7 +698,7 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
log.debug("Group coordinator lookup failed: {}", error.message());
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
@ -38,6 +39,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@ -866,7 +868,10 @@ public class TransactionManager {
throw new IllegalStateException("Invalid coordinator type: " + type);
}
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(type.id())
.setKey(coordinatorKey));
enqueueRequest(new FindCoordinatorHandler(builder));
}
@ -1193,10 +1198,11 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
Errors error = findCoordinatorResponse.error();
CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
if (error == Errors.NONE) {
Node node = findCoordinatorResponse.node();
switch (builder.coordinatorType()) {
switch (coordinatorType) {
case GROUP:
consumerGroupCoordinator = node;
break;
@ -1209,11 +1215,11 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
abortableError(new GroupAuthorizationException(builder.data().key()));
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
"unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
findCoordinatorResponse.error().message())));
"unexpected error: %s", coordinatorType, builder.data().key(),
findCoordinatorResponse.data().errorMessage())));
}
}
}

View File

@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@ -83,8 +85,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@ -135,8 +135,8 @@ public enum ApiKeys {
ControlledShutdownResponseData.SCHEMAS),
OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
FindCoordinatorResponse.schemaVersions()),
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
FindCoordinatorResponseData.SCHEMAS),
JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),

View File

@ -83,7 +83,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case OFFSET_FETCH:
return new OffsetFetchResponse(struct);
case FIND_COORDINATOR:
return new FindCoordinatorResponse(struct);
return new FindCoordinatorResponse(struct, version);
case JOIN_GROUP:
return new JoinGroupResponse(struct, version);
case HEARTBEAT:

View File

@ -18,122 +18,64 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class FindCoordinatorRequest extends AbstractRequest {
private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
"for transactional producers, this is the transactional id)"),
new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;
public static Schema[] schemaVersions() {
return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2};
}
public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
private final String coordinatorKey;
private final CoordinatorType coordinatorType;
private final short minVersion;
private final FindCoordinatorRequestData data;
public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
public Builder(FindCoordinatorRequestData data) {
super(ApiKeys.FIND_COORDINATOR);
this.coordinatorType = coordinatorType;
this.coordinatorKey = coordinatorKey;
this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
this.data = data;
}
@Override
public FindCoordinatorRequest build(short version) {
if (version < minVersion)
if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in " + minVersion + " or later.");
return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
}
public String coordinatorKey() {
return coordinatorKey;
}
public CoordinatorType coordinatorType() {
return coordinatorType;
"because we require features supported only in 2 or later.");
}
return new FindCoordinatorRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=FindCoordinatorRequest, coordinatorKey=");
bld.append(coordinatorKey);
bld.append(", coordinatorType=");
bld.append(coordinatorType);
bld.append(")");
return bld.toString();
return data.toString();
}
public FindCoordinatorRequestData data() {
return data;
}
}
private final String coordinatorKey;
private final CoordinatorType coordinatorType;
private final FindCoordinatorRequestData data;
private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) {
private FindCoordinatorRequest(FindCoordinatorRequestData data, short version) {
super(ApiKeys.FIND_COORDINATOR, version);
this.coordinatorType = coordinatorType;
this.coordinatorKey = coordinatorKey;
this.data = data;
}
public FindCoordinatorRequest(Struct struct, short version) {
super(ApiKeys.FIND_COORDINATOR, version);
if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
else
this.coordinatorType = CoordinatorType.GROUP;
if (struct.hasField(GROUP_ID))
this.coordinatorKey = struct.get(GROUP_ID);
else
this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
this.data = new FindCoordinatorRequestData(struct, version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
case 1:
case 2:
return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion()));
FindCoordinatorResponseData response = new FindCoordinatorResponseData();
if (version() >= 2) {
response.setThrottleTimeMs(throttleTimeMs);
}
}
public String coordinatorKey() {
return coordinatorKey;
}
public CoordinatorType coordinatorType() {
return coordinatorType;
Errors error = Errors.forException(e);
return FindCoordinatorResponse.prepareResponse(error, Node.noNode());
}
public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
@ -142,14 +84,11 @@ public class FindCoordinatorRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
if (struct.hasField(GROUP_ID))
struct.set(GROUP_ID, coordinatorKey);
else
struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id);
return struct;
return data.toStruct(version());
}
public FindCoordinatorRequestData data() {
return data;
}
public enum CoordinatorType {
@ -161,6 +100,10 @@ public class FindCoordinatorRequest extends AbstractRequest {
this.id = id;
}
public byte id() {
return id;
}
public static CoordinatorType forId(byte id) {
switch (id) {
case 0:

View File

@ -17,53 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class FindCoordinatorResponse extends AbstractResponse {
private static final String COORDINATOR_KEY_NAME = "coordinator";
// coordinator level field names
private static final String NODE_ID_KEY_NAME = "node_id";
private static final String HOST_KEY_NAME = "host";
private static final String PORT_KEY_NAME = "port";
private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
ERROR_CODE,
new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
"for a consumer group."));
private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
THROTTLE_TIME_MS,
ERROR_CODE,
ERROR_MESSAGE,
new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;
public static Schema[] schemaVersions() {
return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2};
}
/**
* Possible error codes:
@ -75,88 +39,68 @@ public class FindCoordinatorResponse extends AbstractResponse {
* TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
*/
private final FindCoordinatorResponseData data;
private final int throttleTimeMs;
private final String errorMessage;
private final Errors error;
private final Node node;
public FindCoordinatorResponse(Errors error, Node node) {
this(DEFAULT_THROTTLE_TIME, error, node);
public FindCoordinatorResponse(FindCoordinatorResponseData data) {
this.data = data;
}
public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.node = node;
this.errorMessage = null;
public FindCoordinatorResponse(Struct struct, short version) {
this.data = new FindCoordinatorResponseData(struct, version);
}
public FindCoordinatorResponse(Struct struct) {
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
error = Errors.forCode(struct.get(ERROR_CODE));
errorMessage = struct.getOrElse(ERROR_MESSAGE, null);
public FindCoordinatorResponseData data() {
return data;
}
Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
int nodeId = broker.getInt(NODE_ID_KEY_NAME);
String host = broker.getString(HOST_KEY_NAME);
int port = broker.getInt(PORT_KEY_NAME);
node = new Node(nodeId, host, port);
public Node node() {
return new Node(data.nodeId(), data.host(), data.port());
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
return data.throttleTimeMs();
}
public boolean hasError() {
return this.error != Errors.NONE;
return error() != Errors.NONE;
}
public Errors error() {
return error;
return Errors.forCode(data.errorCode());
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(error);
}
public Node node() {
return node;
return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
struct.set(ERROR_CODE, error.code());
struct.setIfExists(ERROR_MESSAGE, errorMessage);
Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
coordinator.set(NODE_ID_KEY_NAME, node.id());
coordinator.set(HOST_KEY_NAME, node.host());
coordinator.set(PORT_KEY_NAME, node.port());
struct.set(COORDINATOR_KEY_NAME, coordinator);
return struct;
return data.toStruct(version);
}
public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer), version);
}
@Override
public String toString() {
return "FindCoordinatorResponse(" +
"throttleTimeMs=" + throttleTimeMs +
", errorMessage='" + errorMessage + '\'' +
", error=" + error +
", node=" + node +
')';
return data.toString();
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 2;
}
public static FindCoordinatorResponse prepareResponse(Errors error, Node node) {
FindCoordinatorResponseData data = new FindCoordinatorResponseData();
data.setErrorCode(error.code())
.setErrorMessage(error.message())
.setNodeId(node.id())
.setHost(node.host())
.setPort(node.port());
return new FindCoordinatorResponse(data);
}
}

View File

@ -236,6 +236,10 @@ public class KafkaAdminClientTest {
return new DeleteTopicsResponse(data);
}
private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) {
return FindCoordinatorResponse.prepareResponse(error, node);
}
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@ -1072,7 +1076,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
@ -1139,7 +1143,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@ -1183,10 +1187,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>();
response.put("group-0", Errors.NONE);
@ -1198,7 +1202,7 @@ public class KafkaAdminClientTest {
assertNull(results.get());
//should throw error for non-retriable errors
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);

View File

@ -580,7 +580,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// lookup committed offset and find nothing
@ -604,7 +604,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
@ -629,7 +629,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
@ -679,7 +679,7 @@ public class KafkaConsumerTest {
consumer.assign(singletonList(tp0));
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// fetch offset for one topic
@ -1116,7 +1116,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@ -1172,7 +1172,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@ -1226,7 +1226,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@ -1420,7 +1420,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@ -1465,7 +1465,7 @@ public class KafkaConsumerTest {
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator, true);
// should try and find the new coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
// rejoin group
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@ -1663,7 +1663,7 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
@ -1691,7 +1691,7 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
@ -1797,7 +1797,6 @@ public class KafkaConsumerTest {
return new ListOffsetResponse(partitionData);
}
private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> tpResponses = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {

View File

@ -768,7 +768,7 @@ public class AbstractCoordinatorTest {
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
return new FindCoordinatorResponse(error, node);
return FindCoordinatorResponse.prepareResponse(error, node);
}
private HeartbeatResponse heartbeatResponse(Errors error) {

View File

@ -2184,7 +2184,7 @@ public class ConsumerCoordinatorTest {
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
return new FindCoordinatorResponse(error, node);
return FindCoordinatorResponse.prepareResponse(error, node);
}
private HeartbeatResponse heartbeatResponse(Errors error) {

View File

@ -662,7 +662,7 @@ public class KafkaProducerTest {
Node node = metadata.fetch().nodes().get(0);
client.throttle(node, 5000);
client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
@ -855,7 +855,7 @@ public class KafkaProducerTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch assertionDoneLatch = new CountDownLatch(1);
client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
executorService.submit(() -> {
assertThrows(KafkaException.class, producer::initTransactions);
assertionDoneLatch.countDown();
@ -884,7 +884,7 @@ public class KafkaProducerTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch assertionDoneLatch = new CountDownLatch(1);
client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
executorService.submit(() -> {
assertThrows(KafkaException.class, producer::initTransactions);
assertionDoneLatch.countDown();

View File

@ -265,7 +265,6 @@ public class SenderTest {
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@Test
@SuppressWarnings("deprecation")
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@ -1811,7 +1810,6 @@ public class SenderTest {
}
@Test
@SuppressWarnings("deprecation")
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
@ -1904,7 +1902,6 @@ public class SenderTest {
testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
}
@SuppressWarnings("deprecation")
private void testSplitBatchAndSend(TransactionManager txnManager,
ProducerIdAndEpoch producerIdAndEpoch,
TopicPartition tp) throws Exception {
@ -2473,7 +2470,8 @@ public class SenderTest {
}
private void prepareFindCoordinatorResponse(Errors error) {
client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
Node node = metadata.fetch().nodes().get(0);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, node));
}
private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {

View File

@ -687,8 +687,8 @@ public class TransactionManagerTest {
transactionManager.initializeTransactions();
client.prepareUnsupportedVersionResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION);
assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId);
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
assertEquals(findCoordinatorRequest.data().key(), transactionalId);
return true;
});
@ -2381,10 +2381,10 @@ public class TransactionManagerTest {
final String coordinatorKey) {
client.prepareResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey);
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType);
assertEquals(findCoordinatorRequest.data().key(), coordinatorKey);
return true;
}, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
}, FindCoordinatorResponse.prepareResponse(error, brokerNode), shouldDisconnect);
}
private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {

View File

@ -52,6 +52,7 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@ -84,6 +85,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
@ -445,7 +447,10 @@ public class RequestResponseTest {
@Test(expected = UnsupportedVersionException.class)
public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.TRANSACTION.id)
.setKey("foobar"));
builder.build((short) 0);
}
@ -697,12 +702,16 @@ public class RequestResponseTest {
}
private FindCoordinatorRequest createFindCoordinatorRequest(int version) {
return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
return new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey("test-group"))
.build((short) version);
}
private FindCoordinatorResponse createFindCoordinatorResponse() {
return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
Node node = new Node(10, "host1", 2014);
return FindCoordinatorResponse.prepareResponse(Errors.NONE, node);
}
private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List<TopicPartition> toForget) {

View File

@ -212,7 +212,7 @@ public class WorkerCoordinatorTest {
final String consumerId = "leader";
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
@ -252,7 +252,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
@ -293,7 +293,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// config mismatch results in assignment error
@ -324,7 +324,7 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// join the group once
@ -488,11 +488,6 @@ public class WorkerCoordinatorTest {
PowerMock.verifyAll();
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
return new FindCoordinatorResponse(error, node);
}
private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
Map<String, Long> configOffsets, Errors error) {
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();

View File

@ -28,7 +28,8 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData, FindCoordinatorRequestData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -41,6 +42,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
/**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@ -108,7 +110,10 @@ class AdminClient(val time: Time,
}
def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
val requestBuilder = new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id)
.setKey(groupId))
def sendRequest: Try[FindCoordinatorResponse] =
Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])

View File

@ -57,6 +57,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
@ -1171,22 +1172,22 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFindCoordinatorRequest(request: RequestChannel.Request) {
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
!authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL)))
if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
!authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.data.key, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
!authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL)))
else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
!authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.data.key, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else {
// get metadata (and create the topic if necessary)
val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
case FindCoordinatorRequest.CoordinatorType.GROUP =>
val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
case CoordinatorType.GROUP =>
val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
(partition, metadata)
case FindCoordinatorRequest.CoordinatorType.TRANSACTION =>
val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
case CoordinatorType.TRANSACTION =>
val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
(partition, metadata)
@ -1195,8 +1196,18 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def createResponse(requestThrottleMs: Int): AbstractResponse = {
def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setErrorCode(error.code)
.setErrorMessage(error.message)
.setNodeId(node.id)
.setHost(node.host)
.setPort(node.port)
.setThrottleTimeMs(requestThrottleMs))
}
val responseBody = if (topicMetadata.error != Errors.NONE) {
new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
@ -1205,9 +1216,9 @@ class KafkaApis(val requestChannel: RequestChannel,
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint)
createFindCoordinatorResponse(Errors.NONE, endpoint)
case _ =>
new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
}
trace("Sending FindCoordinator response %s for correlation id %d to client %s."

View File

@ -302,7 +302,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createFindCoordinatorRequest = {
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
.setKey(group)).build()
}
private def createUpdateMetadataRequest = {

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.junit.Assert._
@ -254,7 +255,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
private def findCoordinator(group: String): Int = {
val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
val request = new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
.setKey(group)).build()
var nodeId = -1
TestUtils.waitUntilTrue(() => {
val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR)

View File

@ -274,7 +274,10 @@ class RequestQuotaTest extends BaseRequestTest {
new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
.setKey("test-group"))
case ApiKeys.JOIN_GROUP =>
new JoinGroupRequest.Builder(
@ -489,7 +492,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs