diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ffe24ca50a6..a0958e9a1be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -68,6 +68,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet; @@ -103,6 +104,7 @@ import org.apache.kafka.common.requests.DeleteAclsRequest; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsRequest; @@ -2538,8 +2540,11 @@ public class KafkaAdminClient extends AdminClient { runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + FindCoordinatorRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(groupId)); } @Override @@ -2781,8 +2786,11 @@ public class KafkaAdminClient extends AdminClient { runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + FindCoordinatorRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(groupId)); } @Override @@ -2872,8 +2880,11 @@ public class KafkaAdminClient extends AdminClient { runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + FindCoordinatorRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(groupId)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 012433857e6..2cf39103f2a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; @@ -661,7 +663,10 @@ public abstract class AbstractCoordinator implements Closeable { // initiate the group metadata request log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequest.Builder requestBuilder = - new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId); + new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(this.groupId)); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); } @@ -679,12 +684,12 @@ public abstract class AbstractCoordinator implements Closeable { synchronized (AbstractCoordinator.this) { // use MAX_VALUE - node.id as the coordinator id to allow separate connections // for the coordinator in the underlying network client layer - int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id(); + int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId(); AbstractCoordinator.this.coordinator = new Node( coordinatorConnectionId, - findCoordinatorResponse.node().host(), - findCoordinatorResponse.node().port()); + findCoordinatorResponse.data().host(), + findCoordinatorResponse.data().port()); log.info("Discovered group coordinator {}", coordinator); client.tryConnect(coordinator); heartbeat.resetSessionTimeout(); @@ -693,7 +698,7 @@ public abstract class AbstractCoordinator implements Closeable { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - log.debug("Group coordinator lookup failed: {}", error.message()); + log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage()); future.raise(error); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 98fa153dc56..e24d69b7725 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.DefaultRecordBatch; @@ -38,6 +39,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -866,7 +868,10 @@ public class TransactionManager { throw new IllegalStateException("Invalid coordinator type: " + type); } - FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey); + FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(type.id()) + .setKey(coordinatorKey)); enqueueRequest(new FindCoordinatorHandler(builder)); } @@ -1193,10 +1198,11 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; Errors error = findCoordinatorResponse.error(); + CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); if (error == Errors.NONE) { Node node = findCoordinatorResponse.node(); - switch (builder.coordinatorType()) { + switch (coordinatorType) { case GROUP: consumerGroupCoordinator = node; break; @@ -1209,11 +1215,11 @@ public class TransactionManager { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) { - abortableError(new GroupAuthorizationException(builder.coordinatorKey())); + abortableError(new GroupAuthorizationException(builder.data().key())); } else { fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" + - "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(), - findCoordinatorResponse.error().message()))); + "unexpected error: %s", coordinatorType, builder.data().key(), + findCoordinatorResponse.data().errorMessage()))); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 8109182c5c3..3e2a87aa439 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -83,8 +85,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.FindCoordinatorRequest; -import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.LeaderAndIsrRequest; @@ -135,8 +135,8 @@ public enum ApiKeys { ControlledShutdownResponseData.SCHEMAS), OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS), OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()), - FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(), - FindCoordinatorResponse.schemaVersions()), + FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS, + FindCoordinatorResponseData.SCHEMAS), JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS), HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()), LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index c21fa2ba653..9c747ec611f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -83,7 +83,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case OFFSET_FETCH: return new OffsetFetchResponse(struct); case FIND_COORDINATOR: - return new FindCoordinatorResponse(struct); + return new FindCoordinatorResponse(struct, version); case JOIN_GROUP: return new JoinGroupResponse(struct, version); case HEARTBEAT: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index 2d44ab3015b..0e728439169 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -18,122 +18,64 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; -import static org.apache.kafka.common.protocol.types.Type.INT8; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class FindCoordinatorRequest extends AbstractRequest { - private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; - private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; - - private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID); - - private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( - new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + - "for transactional producers, this is the transactional id)"), - new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)")); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1; - - public static Schema[] schemaVersions() { - return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2}; - } public static class Builder extends AbstractRequest.Builder { - private final String coordinatorKey; - private final CoordinatorType coordinatorType; - private final short minVersion; + private final FindCoordinatorRequestData data; - public Builder(CoordinatorType coordinatorType, String coordinatorKey) { + public Builder(FindCoordinatorRequestData data) { super(ApiKeys.FIND_COORDINATOR); - this.coordinatorType = coordinatorType; - this.coordinatorKey = coordinatorKey; - this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0; + this.data = data; } @Override public FindCoordinatorRequest build(short version) { - if (version < minVersion) + if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) { throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + - "because we require features supported only in " + minVersion + " or later."); - return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version); - } - - public String coordinatorKey() { - return coordinatorKey; - } - - public CoordinatorType coordinatorType() { - return coordinatorType; + "because we require features supported only in 2 or later."); + } + return new FindCoordinatorRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=FindCoordinatorRequest, coordinatorKey="); - bld.append(coordinatorKey); - bld.append(", coordinatorType="); - bld.append(coordinatorType); - bld.append(")"); - return bld.toString(); + return data.toString(); + } + + public FindCoordinatorRequestData data() { + return data; } } - private final String coordinatorKey; - private final CoordinatorType coordinatorType; + private final FindCoordinatorRequestData data; - private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) { + private FindCoordinatorRequest(FindCoordinatorRequestData data, short version) { super(ApiKeys.FIND_COORDINATOR, version); - this.coordinatorType = coordinatorType; - this.coordinatorKey = coordinatorKey; + this.data = data; } public FindCoordinatorRequest(Struct struct, short version) { super(ApiKeys.FIND_COORDINATOR, version); - - if (struct.hasField(COORDINATOR_TYPE_KEY_NAME)) - this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME)); - else - this.coordinatorType = CoordinatorType.GROUP; - if (struct.hasField(GROUP_ID)) - this.coordinatorKey = struct.get(GROUP_ID); - else - this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME); + this.data = new FindCoordinatorRequestData(struct, version); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { - case 0: - return new FindCoordinatorResponse(Errors.forException(e), Node.noNode()); - case 1: - case 2: - return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode()); - - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion())); + FindCoordinatorResponseData response = new FindCoordinatorResponseData(); + if (version() >= 2) { + response.setThrottleTimeMs(throttleTimeMs); } - } - - public String coordinatorKey() { - return coordinatorKey; - } - - public CoordinatorType coordinatorType() { - return coordinatorType; + Errors error = Errors.forException(e); + return FindCoordinatorResponse.prepareResponse(error, Node.noNode()); } public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) { @@ -142,14 +84,11 @@ public class FindCoordinatorRequest extends AbstractRequest { @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version())); - if (struct.hasField(GROUP_ID)) - struct.set(GROUP_ID, coordinatorKey); - else - struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey); - if (struct.hasField(COORDINATOR_TYPE_KEY_NAME)) - struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id); - return struct; + return data.toStruct(version()); + } + + public FindCoordinatorRequestData data() { + return data; } public enum CoordinatorType { @@ -161,6 +100,10 @@ public class FindCoordinatorRequest extends AbstractRequest { this.id = id; } + public byte id() { + return id; + } + public static CoordinatorType forId(byte id) { switch (id) { case 0: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index bc7f654c0bb..c880408a8ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -17,53 +17,17 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class FindCoordinatorResponse extends AbstractResponse { - private static final String COORDINATOR_KEY_NAME = "coordinator"; - - // coordinator level field names - private static final String NODE_ID_KEY_NAME = "node_id"; - private static final String HOST_KEY_NAME = "host"; - private static final String PORT_KEY_NAME = "port"; - - private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema( - new Field(NODE_ID_KEY_NAME, INT32, "The broker id."), - new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."), - new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests.")); - - private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " + - "for a consumer group.")); - - private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema( - THROTTLE_TIME_MS, - ERROR_CODE, - ERROR_MESSAGE, - new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator")); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1; - - public static Schema[] schemaVersions() { - return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2}; - } /** * Possible error codes: @@ -75,88 +39,68 @@ public class FindCoordinatorResponse extends AbstractResponse { * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) */ + private final FindCoordinatorResponseData data; - private final int throttleTimeMs; - private final String errorMessage; - private final Errors error; - private final Node node; - - public FindCoordinatorResponse(Errors error, Node node) { - this(DEFAULT_THROTTLE_TIME, error, node); + public FindCoordinatorResponse(FindCoordinatorResponseData data) { + this.data = data; } - public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.node = node; - this.errorMessage = null; + public FindCoordinatorResponse(Struct struct, short version) { + this.data = new FindCoordinatorResponseData(struct, version); } - public FindCoordinatorResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - error = Errors.forCode(struct.get(ERROR_CODE)); - errorMessage = struct.getOrElse(ERROR_MESSAGE, null); + public FindCoordinatorResponseData data() { + return data; + } - Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); - int nodeId = broker.getInt(NODE_ID_KEY_NAME); - String host = broker.getString(HOST_KEY_NAME); - int port = broker.getInt(PORT_KEY_NAME); - node = new Node(nodeId, host, port); + public Node node() { + return new Node(data.nodeId(), data.host(), data.port()); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } public boolean hasError() { - return this.error != Errors.NONE; + return error() != Errors.NONE; } public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } @Override public Map errorCounts() { - return errorCounts(error); - } - - public Node node() { - return node; + return Collections.singletonMap(error(), 1); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version)); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(ERROR_CODE, error.code()); - struct.setIfExists(ERROR_MESSAGE, errorMessage); - - Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); - coordinator.set(NODE_ID_KEY_NAME, node.id()); - coordinator.set(HOST_KEY_NAME, node.host()); - coordinator.set(PORT_KEY_NAME, node.port()); - struct.set(COORDINATOR_KEY_NAME, coordinator); - return struct; + return data.toStruct(version); } public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) { - return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer)); + return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer), version); } @Override public String toString() { - return "FindCoordinatorResponse(" + - "throttleTimeMs=" + throttleTimeMs + - ", errorMessage='" + errorMessage + '\'' + - ", error=" + error + - ", node=" + node + - ')'; + return data.toString(); } @Override public boolean shouldClientThrottle(short version) { return version >= 2; } + + public static FindCoordinatorResponse prepareResponse(Errors error, Node node) { + FindCoordinatorResponseData data = new FindCoordinatorResponseData(); + data.setErrorCode(error.code()) + .setErrorMessage(error.message()) + .setNodeId(node.id()) + .setHost(node.host()) + .setPort(node.port()); + return new FindCoordinatorResponse(data); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 1367b947bce..687dad2ed12 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -236,6 +236,10 @@ public class KafkaAdminClientTest { return new DeleteTopicsResponse(data); } + private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) { + return FindCoordinatorResponse.prepareResponse(error, node); + } + /** * Test that the client properly times out when we don't receive any metadata. */ @@ -1072,7 +1076,7 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); DescribeGroupsResponseData data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); @@ -1139,7 +1143,7 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); @@ -1183,10 +1187,10 @@ public class KafkaAdminClientTest { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final Map response = new HashMap<>(); response.put("group-0", Errors.NONE); @@ -1198,7 +1202,7 @@ public class KafkaAdminClientTest { assertNull(results.get()); //should throw error for non-retriable errors - env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8f23d41240f..ccd9e94aba9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -580,7 +580,7 @@ public class KafkaConsumerTest { true, groupId, groupInstanceId); consumer.assign(singletonList(tp0)); - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // lookup committed offset and find nothing @@ -604,7 +604,7 @@ public class KafkaConsumerTest { true, groupId, groupInstanceId); consumer.assign(singletonList(tp0)); - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator); @@ -629,7 +629,7 @@ public class KafkaConsumerTest { true, groupId, groupInstanceId); consumer.assign(singletonList(tp0)); - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator); @@ -679,7 +679,7 @@ public class KafkaConsumerTest { consumer.assign(singletonList(tp0)); // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // fetch offset for one topic @@ -1116,7 +1116,7 @@ public class KafkaConsumerTest { KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1172,7 +1172,7 @@ public class KafkaConsumerTest { KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1226,7 +1226,7 @@ public class KafkaConsumerTest { KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1420,7 +1420,7 @@ public class KafkaConsumerTest { KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); @@ -1465,7 +1465,7 @@ public class KafkaConsumerTest { client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator, true); // should try and find the new coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); // rejoin group client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); @@ -1663,7 +1663,7 @@ public class KafkaConsumerTest { private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, PartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); } @@ -1691,7 +1691,7 @@ public class KafkaConsumerTest { private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator - client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); } @@ -1797,7 +1797,6 @@ public class KafkaConsumerTest { return new ListOffsetResponse(partitionData); } - private FetchResponse fetchResponse(Map fetches) { LinkedHashMap> tpResponses = new LinkedHashMap<>(); for (Map.Entry fetchEntry : fetches.entrySet()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index c3f0ff5f15a..5aaf4764384 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -768,7 +768,7 @@ public class AbstractCoordinatorTest { } private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { - return new FindCoordinatorResponse(error, node); + return FindCoordinatorResponse.prepareResponse(error, node); } private HeartbeatResponse heartbeatResponse(Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 3f9d89ffe4c..a60316f976d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2184,7 +2184,7 @@ public class ConsumerCoordinatorTest { } private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { - return new FindCoordinatorResponse(error, node); + return FindCoordinatorResponse.prepareResponse(error, node); } private HeartbeatResponse heartbeatResponse(Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index d6b3d742091..170b75e4bfb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -662,7 +662,7 @@ public class KafkaProducerTest { Node node = metadata.fetch().nodes().get(0); client.throttle(node, 5000); - client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); try (Producer producer = new KafkaProducer<>(configs, new StringSerializer(), @@ -855,7 +855,7 @@ public class KafkaProducerTest { ExecutorService executorService = Executors.newSingleThreadExecutor(); CountDownLatch assertionDoneLatch = new CountDownLatch(1); - client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); executorService.submit(() -> { assertThrows(KafkaException.class, producer::initTransactions); assertionDoneLatch.countDown(); @@ -884,7 +884,7 @@ public class KafkaProducerTest { ExecutorService executorService = Executors.newSingleThreadExecutor(); CountDownLatch assertionDoneLatch = new CountDownLatch(1); - client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); executorService.submit(() -> { assertThrows(KafkaException.class, producer::initTransactions); assertionDoneLatch.countDown(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 5e4e31e2697..04197d86ca4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -265,7 +265,6 @@ public class SenderTest { * Send multiple requests. Verify that the client side quota metrics have the right values */ @Test - @SuppressWarnings("deprecation") public void testQuotaMetrics() throws Exception { MockSelector selector = new MockSelector(time); Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry); @@ -1811,7 +1810,6 @@ public class SenderTest { } @Test - @SuppressWarnings("deprecation") public void testAbortRetryWhenProducerIdChanges() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); @@ -1904,7 +1902,6 @@ public class SenderTest { testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); } - @SuppressWarnings("deprecation") private void testSplitBatchAndSend(TransactionManager txnManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition tp) throws Exception { @@ -2473,7 +2470,8 @@ public class SenderTest { } private void prepareFindCoordinatorResponse(Errors error) { - client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0))); + Node node = metadata.fetch().nodes().get(0); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, node)); } private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 2d3487f5126..ca03b76364d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -687,8 +687,8 @@ public class TransactionManagerTest { transactionManager.initializeTransactions(); client.prepareUnsupportedVersionResponse(body -> { FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body; - assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION); - assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId); + assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION); + assertEquals(findCoordinatorRequest.data().key(), transactionalId); return true; }); @@ -2381,10 +2381,10 @@ public class TransactionManagerTest { final String coordinatorKey) { client.prepareResponse(body -> { FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body; - assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType); - assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey); + assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType); + assertEquals(findCoordinatorRequest.data().key(), coordinatorKey); return true; - }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect); + }, FindCoordinatorResponse.prepareResponse(error, brokerNode), shouldDisconnect); } private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b6a2dadfd3c..61ad3b32968 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -84,6 +85,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -445,7 +447,10 @@ public class RequestResponseTest { @Test(expected = UnsupportedVersionException.class) public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() { - FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar"); + FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.TRANSACTION.id) + .setKey("foobar")); builder.build((short) 0); } @@ -697,12 +702,16 @@ public class RequestResponseTest { } private FindCoordinatorRequest createFindCoordinatorRequest(int version) { - return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group") + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setKey("test-group")) .build((short) version); } private FindCoordinatorResponse createFindCoordinatorResponse() { - return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014)); + Node node = new Node(10, "host1", 2014); + return FindCoordinatorResponse.prepareResponse(Errors.NONE, node); } private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List toForget) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index c78c7a4756c..cda5f618bb2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -212,7 +212,7 @@ public class WorkerCoordinatorTest { final String consumerId = "leader"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); // normal join group @@ -252,7 +252,7 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); // normal join group @@ -293,7 +293,7 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); // config mismatch results in assignment error @@ -324,7 +324,7 @@ public class WorkerCoordinatorTest { PowerMock.replayAll(); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); // join the group once @@ -488,11 +488,6 @@ public class WorkerCoordinatorTest { PowerMock.verifyAll(); } - - private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { - return new FindCoordinatorResponse(error, node); - } - private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, Map configOffsets, Errors error) { List metadata = new ArrayList<>(); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 17716d3762f..c78a4516ecc 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -28,7 +28,8 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException} import org.apache.kafka.common.internals.ClusterResourceListeners -import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData} +import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData, FindCoordinatorRequestData} + import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -41,6 +42,7 @@ import org.apache.kafka.common.{Node, TopicPartition} import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType /** * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers, @@ -108,7 +110,10 @@ class AdminClient(val time: Time, } def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = { - val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId) + val requestBuilder = new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id) + .setKey(groupId)) def sendRequest: Try[FindCoordinatorResponse] = Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4e4c225b80b..34ed7d7896f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -57,6 +57,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.PatternType.LITERAL @@ -1171,22 +1172,22 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFindCoordinatorRequest(request: RequestChannel.Request) { val findCoordinatorRequest = request.body[FindCoordinatorRequest] - if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP && - !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL))) + if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id && + !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.data.key, LITERAL))) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) - else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION && - !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL))) + else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id && + !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.data.key, LITERAL))) sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { - case FindCoordinatorRequest.CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) + val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + case CoordinatorType.GROUP => + val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata) - case FindCoordinatorRequest.CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) + case CoordinatorType.TRANSACTION => + val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) (partition, metadata) @@ -1195,8 +1196,18 @@ class KafkaApis(val requestChannel: RequestChannel, } def createResponse(requestThrottleMs: Int): AbstractResponse = { + def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setErrorCode(error.code) + .setErrorMessage(error.message) + .setNodeId(node.id) + .setHost(node.host) + .setPort(node.port) + .setThrottleTimeMs(requestThrottleMs)) + } val responseBody = if (topicMetadata.error != Errors.NONE) { - new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) @@ -1205,9 +1216,9 @@ class KafkaApis(val requestChannel: RequestChannel, coordinatorEndpoint match { case Some(endpoint) if !endpoint.isEmpty => - new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint) + createFindCoordinatorResponse(Errors.NONE, endpoint) case _ => - new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } trace("Sending FindCoordinator response %s for correlation id %d to client %s." diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e7a094f937d..ee1c02b9b21 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -302,7 +302,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createFindCoordinatorRequest = { - new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build() + new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) + .setKey(group)).build() } private def createUpdateMetadataRequest = { diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 385eb8f0b0a..ae6fc00f1a6 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.GroupMaxSizeReachedException +import org.apache.kafka.common.message.FindCoordinatorRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse} import org.junit.Assert._ @@ -254,7 +255,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } private def findCoordinator(group: String): Int = { - val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build() + val request = new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) + .setKey(group)).build() var nodeId = -1 TestUtils.waitUntilTrue(() => { val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index d9c8bb552b2..b4c1268ba8e 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -274,7 +274,10 @@ class RequestQuotaTest extends BaseRequestTest { new OffsetFetchRequest.Builder("test-group", List(tp).asJava) case ApiKeys.FIND_COORDINATOR => - new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group") + new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) + .setKey("test-group")) case ApiKeys.JOIN_GROUP => new JoinGroupRequest.Builder( @@ -489,7 +492,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs - case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs + case ApiKeys.FIND_COORDINATOR => + new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs