KAFKA-2507 KAFKA-2959; Remove legacy ControlledShutdown request/response objects

This patch replaces the legacy ControlledShutdown objects in `kafka.api` with the alternatives in `org.apache.kafka.common.requests`. Since this was the last API that needed updating, we have also dropped the reference in `RequestChannel.Request` to the legacy object type.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3612 from hachikuji/remove-old-controlled-shutdown-objects
This commit is contained in:
Jason Gustafson 2017-08-03 16:18:26 -07:00
parent 613971f8d4
commit 5a516fb28e
16 changed files with 244 additions and 428 deletions

View File

@ -42,15 +42,19 @@ import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
public class Protocol {
public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
public static final Schema REQUEST_HEADER = new Schema(
new Field("api_key", INT16, "The id of the request type."),
new Field("api_version", INT16, "The version of the API."),
new Field("correlation_id",
INT32,
"A user-supplied integer value that will be passed back with the response"),
new Field("client_id",
NULLABLE_STRING,
"A user specified identifier for the client making the request.",
""));
new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),
new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", ""));
// Version 0 of the controlled shutdown API used a non-standard request header (the clientId is missing).
// This can be removed once we drop support for that version.
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER = new Schema(
new Field("api_key", INT16, "The id of the request type."),
new Field("api_version", INT16, "The version of the API."),
new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"));
public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
INT32,
@ -942,23 +946,25 @@ public class Protocol {
public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
/* Controlled shutdown api */
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(new Field("broker_id",
INT32,
"The id of the broker for which controlled shutdown has been requested."));
public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V1 = new Schema(new Field("topic", STRING),
public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(new Field("topic", STRING),
new Field("partition",
INT32,
"Topic partition id."));
public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = new Schema(new Field("error_code", INT16),
public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
new Field("partitions_remaining",
new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V1),
new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
"The partitions that the broker still leads."));
/* V0 is not supported as it would require changes to the request header not to include `clientId` */
public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
/* Join group api */
public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
@ -1983,6 +1989,14 @@ public class Protocol {
return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
}
public static Schema requestHeaderSchema(short apiKey, short version) {
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id && version == 0)
// This will be removed once we remove support for v0 of ControlledShutdownRequest, which
// depends on a non-standard request header (it does not have a clientId)
return CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER;
return REQUEST_HEADER;
}
private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)

View File

@ -31,7 +31,11 @@ public class ControlledShutdownRequest extends AbstractRequest {
private final int brokerId;
public Builder(int brokerId) {
super(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
this(brokerId, null);
}
public Builder(int brokerId, Short desiredVersion) {
super(ApiKeys.CONTROLLED_SHUTDOWN_KEY, desiredVersion);
this.brokerId = brokerId;
}

View File

@ -239,7 +239,11 @@ public class FetchResponse extends AbstractResponse {
return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader);
}
public Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) {
public Send toSend(int throttleTimeMs, String dest, RequestHeader requestHeader) {
return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader);
}
private Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) {
Struct responseHeader = new ResponseHeader(requestHeader.correlationId()).toStruct();
// write the total size and the response header

View File

@ -16,23 +16,20 @@
*/
package org.apache.kafka.common.requests;
import static org.apache.kafka.common.protocol.Protocol.REQUEST_HEADER;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Struct;
/**
* The header for a request in the Kafka protocol
*/
public class RequestHeader extends AbstractRequestResponse {
private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
private static final String API_KEY_FIELD_NAME = "api_key";
private static final String API_VERSION_FIELD_NAME = "api_version";
private static final String CLIENT_ID_FIELD_NAME = "client_id";
private static final String CORRELATION_ID_FIELD_NAME = "correlation_id";
private final short apiKey;
private final short apiVersion;
@ -40,25 +37,34 @@ public class RequestHeader extends AbstractRequestResponse {
private final int correlationId;
public RequestHeader(Struct struct) {
apiKey = struct.getShort(API_KEY_FIELD);
apiVersion = struct.getShort(API_VERSION_FIELD);
clientId = struct.getString(CLIENT_ID_FIELD);
correlationId = struct.getInt(CORRELATION_ID_FIELD);
apiKey = struct.getShort(API_KEY_FIELD_NAME);
apiVersion = struct.getShort(API_VERSION_FIELD_NAME);
// only v0 of the controlled shutdown request is missing the clientId
if (struct.hasField(CLIENT_ID_FIELD_NAME))
clientId = struct.getString(CLIENT_ID_FIELD_NAME);
else
clientId = "";
correlationId = struct.getInt(CORRELATION_ID_FIELD_NAME);
}
public RequestHeader(short apiKey, short version, String client, int correlation) {
public RequestHeader(short apiKey, short version, String clientId, int correlation) {
this.apiKey = apiKey;
this.apiVersion = version;
this.clientId = client;
this.clientId = clientId;
this.correlationId = correlation;
}
public Struct toStruct() {
Struct struct = new Struct(Protocol.REQUEST_HEADER);
struct.set(API_KEY_FIELD, apiKey);
struct.set(API_VERSION_FIELD, apiVersion);
struct.set(CLIENT_ID_FIELD, clientId);
struct.set(CORRELATION_ID_FIELD, correlationId);
Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
Struct struct = new Struct(schema);
struct.set(API_KEY_FIELD_NAME, apiKey);
struct.set(API_VERSION_FIELD_NAME, apiVersion);
// only v0 of the controlled shutdown request is missing the clientId
if (struct.hasField(CLIENT_ID_FIELD_NAME))
struct.set(CLIENT_ID_FIELD_NAME, clientId);
struct.set(CORRELATION_ID_FIELD_NAME, correlationId);
return struct;
}
@ -83,11 +89,37 @@ public class RequestHeader extends AbstractRequestResponse {
}
public static RequestHeader parse(ByteBuffer buffer) {
return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
short apiKey = buffer.getShort();
short apiVersion = buffer.getShort();
Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
buffer.rewind();
return new RequestHeader(schema.read(buffer));
}
@Override
public String toString() {
return toStruct().toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RequestHeader that = (RequestHeader) o;
return apiKey == that.apiKey &&
apiVersion == that.apiVersion &&
correlationId == that.correlationId &&
(clientId == null ? that.clientId == null : clientId.equals(that.clientId));
}
@Override
public int hashCode() {
int result = (int) apiKey;
result = 31 * result + (int) apiVersion;
result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
result = 31 * result + correlationId;
return result;
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@ -58,9 +57,7 @@ public class NodeApiVersionsTest {
public void testVersionsToString() {
List<ApiVersion> versionList = new ArrayList<>();
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
versionList.add(new ApiVersion(apiKey.id, (short) 0, (short) 0));
} else if (apiKey == ApiKeys.DELETE_TOPICS) {
if (apiKey == ApiKeys.DELETE_TOPICS) {
versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001));
} else {
versionList.add(new ApiVersion(apiKey));
@ -71,9 +68,7 @@ public class NodeApiVersionsTest {
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
bld.append(prefix);
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
bld.append("ControlledShutdown(7): 0 [unusable: node too old]");
} else if (apiKey == ApiKeys.DELETE_TOPICS) {
if (apiKey == ApiKeys.DELETE_TOPICS) {
bld.append("DeleteTopics(20): 10000 to 10001 [unusable: node too new]");
} else {
bld.append(apiKey.name).append("(").
@ -95,21 +90,6 @@ public class NodeApiVersionsTest {
assertEquals(bld.toString(), versions.toString());
}
@Test
public void testUsableVersionCalculation() {
List<ApiVersion> versionList = new ArrayList<>();
versionList.add(new ApiVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, (short) 0, (short) 0));
versionList.add(new ApiVersion(ApiKeys.FETCH.id, (short) 1, (short) 2));
NodeApiVersions versions = new NodeApiVersions(versionList);
try {
versions.usableVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
Assert.fail("expected UnsupportedVersionException");
} catch (UnsupportedVersionException e) {
// pass
}
assertEquals(2, versions.usableVersion(ApiKeys.FETCH));
}
@Test
public void testUsableVersionNoDesiredVersionReturnsLatestUsable() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
@ -121,6 +101,7 @@ public class NodeApiVersionsTest {
public void testDesiredVersion() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1));
assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2));
assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3));
@ -147,6 +128,13 @@ public class NodeApiVersionsTest {
versions.usableVersion(ApiKeys.FETCH);
}
@Test(expected = UnsupportedVersionException.class)
public void testUsableVersionOutOfRange() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 300, (short) 300)));
apiVersions.usableVersion(ApiKeys.PRODUCE);
}
@Test
public void testUsableVersionLatestVersions() {
List<ApiVersion> versionList = new LinkedList<>();

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.apache.kafka.test.TestUtils.toBuffer;
import static org.junit.Assert.assertEquals;
public class RequestHeaderTest {
@Test
public void testSerdeControlledShutdownV0() {
// Verify that version 0 of controlled shutdown does not include the clientId field
int correlationId = 2342;
ByteBuffer rawBuffer = ByteBuffer.allocate(32);
rawBuffer.putShort(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
rawBuffer.putShort((short) 0);
rawBuffer.putInt(correlationId);
rawBuffer.flip();
RequestHeader deserialized = RequestHeader.parse(rawBuffer);
assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, deserialized.apiKey());
assertEquals(0, deserialized.apiVersion());
assertEquals(correlationId, deserialized.correlationId());
assertEquals("", deserialized.clientId());
Struct serialized = deserialized.toStruct();
ByteBuffer serializedBuffer = toBuffer(serialized);
assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, serializedBuffer.getShort(0));
assertEquals(0, serializedBuffer.getShort(2));
assertEquals(correlationId, serializedBuffer.getInt(4));
assertEquals(8, serializedBuffer.limit());
}
@Test
public void testRequestHeader() {
RequestHeader header = new RequestHeader((short) 10, (short) 1, "", 10);
ByteBuffer buffer = toBuffer(header.toStruct());
RequestHeader deserialized = RequestHeader.parse(buffer);
assertEquals(header, deserialized);
}
@Test
public void testRequestHeaderWithNullClientId() {
RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10);
Struct headerStruct = header.toStruct();
ByteBuffer buffer = toBuffer(headerStruct);
RequestHeader deserialized = RequestHeader.parse(buffer);
assertEquals(header.apiKey(), deserialized.apiKey());
assertEquals(header.apiVersion(), deserialized.apiVersion());
assertEquals(header.correlationId(), deserialized.correlationId());
assertEquals("", deserialized.clientId()); // null defaults to ""
}
}

View File

@ -68,6 +68,7 @@ import java.util.Set;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.test.TestUtils.toBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -241,17 +242,6 @@ public class RequestResponseTest {
checkResponse(createDescribeConfigsResponse(), 0);
}
@Test
public void testRequestHeader() {
RequestHeader header = createRequestHeader();
ByteBuffer buffer = toBuffer(header.toStruct());
RequestHeader deserialized = RequestHeader.parse(buffer);
assertEquals(header.apiVersion(), deserialized.apiVersion());
assertEquals(header.apiKey(), deserialized.apiKey());
assertEquals(header.clientId(), deserialized.clientId());
assertEquals(header.correlationId(), deserialized.correlationId());
}
@Test
public void testResponseHeader() {
ResponseHeader header = createResponseHeader();
@ -295,13 +285,6 @@ public class RequestResponseTest {
return (AbstractRequestResponse) deserializer.invoke(null, buffer, version);
}
private ByteBuffer toBuffer(Struct struct) {
ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
struct.writeTo(buffer);
buffer.rewind();
return buffer;
}
@Test(expected = UnsupportedVersionException.class)
public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
@ -508,18 +491,6 @@ public class RequestResponseTest {
assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining());
}
@Test
public void testRequestHeaderWithNullClientId() {
RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10);
Struct headerStruct = header.toStruct();
ByteBuffer buffer = toBuffer(headerStruct);
RequestHeader deserialized = RequestHeader.parse(buffer);
assertEquals(header.apiKey(), deserialized.apiKey());
assertEquals(header.apiVersion(), deserialized.apiVersion());
assertEquals(header.correlationId(), deserialized.correlationId());
assertEquals("", deserialized.clientId()); // null is defaulted to ""
}
@Test(expected = UnsupportedVersionException.class)
public void testCreateTopicRequestV0FailsIfValidateOnly() {
createCreateTopicRequest(0, true);
@ -564,10 +535,6 @@ public class RequestResponseTest {
assertTrue(string.contains("group1"));
}
private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
private ResponseHeader createResponseHeader() {
return new ResponseHeader(10);
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -323,4 +324,11 @@ public class TestUtils {
return list;
}
public static ByteBuffer toBuffer(Struct struct) {
ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
struct.writeTo(buffer);
buffer.rewind();
return buffer;
}
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
object ControlledShutdownRequest extends Logging {
val CurrentVersion = 1.shortValue
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = if (versionId > 0) Some(readShortString(buffer)) else None
val brokerId = buffer.getInt
new ControlledShutdownRequest(versionId, correlationId, clientId, brokerId)
}
}
case class ControlledShutdownRequest(versionId: Short,
correlationId: Int,
clientId: Option[String],
brokerId: Int)
extends RequestOrResponse(Some(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)){
if (versionId > 0 && clientId.isEmpty)
throw new IllegalArgumentException("`clientId` must be defined if `versionId` > 0")
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
clientId.foreach(writeShortString(buffer, _))
buffer.putInt(brokerId)
}
def sizeInBytes: Int = {
2 + /* version id */
4 + /* correlation id */
clientId.fold(0)(shortStringLength) +
4 /* broker id */
}
override def toString: String = {
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition])
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean = false): String = {
val controlledShutdownRequest = new StringBuilder
controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
controlledShutdownRequest.append("; Version: " + versionId)
controlledShutdownRequest.append("; CorrelationId: " + correlationId)
controlledShutdownRequest.append(";ClientId:" + clientId)
controlledShutdownRequest.append("; BrokerId: " + brokerId)
controlledShutdownRequest.toString()
}
}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
import org.apache.kafka.common.protocol.Errors
import collection.Set
object ControlledShutdownResponse {
def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
val correlationId = buffer.getInt
val error = Errors.forCode(buffer.getShort)
val numEntries = buffer.getInt
var partitionsRemaining = Set[TopicAndPartition]()
for (_ <- 0 until numEntries){
val topic = readShortString(buffer)
val partition = buffer.getInt
partitionsRemaining += new TopicAndPartition(topic, partition)
}
new ControlledShutdownResponse(correlationId, error, partitionsRemaining)
}
}
case class ControlledShutdownResponse(correlationId: Int,
error: Errors = Errors.NONE,
partitionsRemaining: Set[TopicAndPartition])
extends RequestOrResponse() {
def sizeInBytes: Int = {
var size =
4 /* correlation id */ +
2 /* error code */ +
4 /* number of responses */
for (topicAndPartition <- partitionsRemaining) {
size +=
2 + topicAndPartition.topic.length /* topic */ +
4 /* partition */
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putShort(error.code)
buffer.putInt(partitionsRemaining.size)
for (topicAndPartition:TopicAndPartition <- partitionsRemaining){
writeShortString(buffer, topicAndPartition.topic)
buffer.putInt(topicAndPartition.partition)
}
}
override def describe(details: Boolean):String = { toString }
}

View File

@ -35,5 +35,7 @@ case class TopicAndPartition(topic: String, partition: Int) {
def asTuple = (topic, partition)
def asTopicPartition = new TopicPartition(topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
}

View File

@ -23,7 +23,6 @@ import java.util.Collections
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaId
import kafka.utils.{Logging, NotNothing}
@ -69,29 +68,14 @@ object RequestChannel extends Logging {
@volatile var apiRemoteCompleteTimeNanos = -1L
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
val requestId = buffer.getShort()
// TODO: this will be removed once we remove support for v0 of ControlledShutdownRequest (which
// depends on a non-standard request header)
val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
ControlledShutdownRequest.readFrom(buffer)
else
null
// if we failed to find a server-side mapping, then try using the
// client-side request / response format
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
try RequestHeader.parse(buffer)
catch {
val header: RequestHeader = try {
RequestHeader.parse(buffer)
} catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: ${buffer.getShort(0)}", ex)
}
} else
null
val bodyAndSize: RequestAndSize =
if (requestObj == null)
try {
// For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
@ -103,22 +87,15 @@ object RequestChannel extends Logging {
case ex: Throwable =>
throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
}
else
null
//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
if (!Protocol.requiresDelayedDeallocation(requestId)) {
if (!Protocol.requiresDelayedDeallocation(header.apiKey)) {
dispose()
}
def requestDesc(details: Boolean): String = {
if (requestObj != null)
requestObj.describe(details)
else
s"$header -- ${body[AbstractRequest].toString(details)}"
}
def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}"
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
bodyAndSize.request match {
@ -158,7 +135,7 @@ object RequestChannel extends Logging {
val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
val fetchMetricNames =
if (requestId == ApiKeys.FETCH.id) {
if (header.apiKey == ApiKeys.FETCH.id) {
val isFromFollower = body[FetchRequest].isFromFollower
Seq(
if (isFromFollower) RequestMetrics.followFetchMetricName
@ -166,7 +143,7 @@ object RequestChannel extends Logging {
)
}
else Seq.empty
val metricNames = fetchMetricNames :+ ApiKeys.forId(requestId).name
val metricNames = fetchMetricNames :+ ApiKeys.forId(header.apiKey).name
metricNames.foreach { metricName =>
val m = RequestMetrics.metricsMap(metricName)
m.requestRate.mark()

View File

@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@ -33,7 +33,7 @@ import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.{RequestChannel, RequestOrResponseSend}
import kafka.network.RequestChannel
import kafka.security.SecurityUtils
import kafka.security.auth._
import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
@ -93,7 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
ApiKeys.forId(request.header.apiKey) match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
@ -228,19 +228,17 @@ class KafkaApis(val requestChannel: RequestChannel,
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
val controlledShutdownRequest = request.body[ControlledShutdownRequest]
authorizeClusterAction(request)
def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicAndPartition]]): Unit = {
controlledShutdownResult match {
case Success(partitionsRemaining) =>
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE, partitionsRemaining)
sendResponseExemptThrottle(RequestChannel.Response(request,
new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
val controlledShutdownResponse = new ControlledShutdownResponse(Errors.NONE,
partitionsRemaining.map(_.asTopicPartition).asJava)
sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownResponse))
case Failure(throwable) =>
sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownRequest.getErrorResponse(throwable)))
}
}
controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@ -552,16 +550,14 @@ class KafkaApis(val requestChannel: RequestChannel,
convertedData.put(tp, convertedPartitionData(tp, partitionData))
}
val response = new FetchResponse(convertedData, 0)
val responseStruct = response.toStruct(versionId)
val responseSend = response.toSend(bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
trace(s"Sending fetch response to client $clientId of ${responseSend.size} bytes.")
response.responseData.asScala.foreach { case (topicPartition, data) =>
// record the bytes out metrics only when the response is being sent
brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
request.connectionId, request.header)
RequestChannel.Response(request, responseSend)
}
@ -1884,23 +1880,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def handleError(request: RequestChannel.Request, e: Throwable) {
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
if (request.requestObj != null) {
def sendResponseCallback(requestThrottleMs: Int) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
}
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.header.apiKey).clusterAction
if (mayThrottle) {
val clientId: String = request.requestObj match {
case r: ControlledShutdownRequest => r.clientId.getOrElse("")
case _ =>
throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
}
sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
} else
sendResponseExemptThrottle(request, () => sendResponseCallback(0))
} else {
def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
/* If request doesn't have a default error response, we just close the connection.
@ -1917,7 +1898,6 @@ class KafkaApis(val requestChannel: RequestChannel,
else
sendResponseExemptThrottle(createResponse(0))
}
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
val alterConfigsRequest = request.body[AlterConfigsRequest]

View File

@ -363,7 +363,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val socketTimeoutMs = config.controllerSocketTimeoutMs
def networkClientControlledShutdown(retries: Int): Boolean = {
def doControlledShutdown(retries: Int): Boolean = {
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
@ -438,7 +438,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
val controlledShutdownApiVersion: Short = if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0 else 1
val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId,
controlledShutdownApiVersion)
val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
time.milliseconds(), true)
val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
@ -472,82 +474,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
shutdownSucceeded
}
def blockingChannelControlledShutdown(retries: Int): Boolean = {
var remainingRetries = retries
var channel: BlockingChannel = null
var prevController: Broker = null
var shutdownSucceeded: Boolean = false
try {
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
val controllerId = zkUtils.getController()
//If this method returns None ignore and try again
zkUtils.getBrokerInfo(controllerId).foreach { broker =>
if (channel == null || prevController == null || !prevController.equals(broker)) {
// if this is the first attempt or if the controller has changed, create a channel to the most recent
// controller
if (channel != null)
channel.disconnect()
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
channel = new BlockingChannel(brokerEndPoint.host,
brokerEndPoint.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
prevController = broker
}
}
// 2. issue a controlled shutdown to the controller
if (channel != null) {
var response: NetworkReceive = null
try {
// send the controlled shutdown request
val request = new kafka.api.ControlledShutdownRequest(0, correlationId.getAndIncrement, None, config.brokerId)
channel.send(request)
response = channel.receive()
val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.isEmpty) {
shutdownSucceeded = true
info ("Controlled shutdown succeeded")
}
else {
info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
info("Error code from controller: %d".format(shutdownResponse.error.code))
}
}
catch {
case ioe: java.io.IOException =>
channel.disconnect()
channel = null
warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: %s".format(ioe.getMessage))
// ignore and try again
}
}
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally {
if (channel != null) {
channel.disconnect()
channel = null
}
}
shutdownSucceeded
}
if (startupComplete.get() && config.controlledShutdownEnable) {
// We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
// of time and try again for a configured number of retries. If all the attempt fails, we simply force
@ -556,16 +482,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
brokerState.newState(PendingControlledShutdown)
val shutdownSucceeded =
// Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
// `RequestHeader`, which is used by `NetworkClient`
if (config.interBrokerProtocolVersion >= KAFKA_0_9_0)
networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}

View File

@ -348,7 +348,7 @@ class SocketServerTest extends JUnitSuite {
val channel = overrideServer.requestChannel
val request = channel.receiveRequest(2000)
val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1
@ -390,7 +390,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.connectionId).isEmpty,
s"Idle connection `${request.connectionId}` was not closed by selector")
val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1