From 0fad944ca777e788e21ff577a1f1ab3b5a7f1266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dezhi=20=E2=80=9CAndy=E2=80=9D=20Fang?= Date: Wed, 8 Apr 2020 23:46:45 -0700 Subject: [PATCH 1/7] KAFKA-9583; Use topic-partitions grouped by node to send OffsetsForLeaderEpoch requests (#8077) In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request sent to brokers that are potentially not the leader for those partitions. This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name). Reviewers: David Arthur , Jason Gustafson --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- .../clients/consumer/internals/OffsetsForLeaderEpochClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2268617337c..9f0ec23cd8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -796,7 +796,7 @@ public class Fetcher implements Closeable { subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs); - RequestFuture future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate); + RequestFuture future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions); future.addListener(new RequestFutureListener() { @Override public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java index 480d0eab9b1..7e372c76416 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java @@ -96,7 +96,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient< partitionsToRetry.add(topicPartition); break; case UNKNOWN_TOPIC_OR_PARTITION: - logger().warn("Received unknown topic or partition error in ListOffset request for partition {}", + logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}", topicPartition); partitionsToRetry.add(topicPartition); break; From c84e6ab491f5344554333c6ce021a4564e9dc64b Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 9 Apr 2020 14:59:25 +0100 Subject: [PATCH 2/7] KAFKA-9433: Use automated protocol for AlterConfigs request and response (#8315) Reviewers: Mickael Maison , Boyang Chen --- .../apache/kafka/common/protocol/ApiKeys.java | 8 +- .../common/requests/AbstractResponse.java | 2 +- .../common/requests/AlterConfigsRequest.java | 146 +++++------------- .../common/requests/AlterConfigsResponse.java | 86 ++--------- .../common/requests/RequestResponseTest.java | 20 ++- .../main/scala/kafka/server/KafkaApis.scala | 18 ++- 6 files changed, 94 insertions(+), 186 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 571b0aa3f43..80d93838f0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.AlterClientQuotasRequestData; import org.apache.kafka.common.message.AlterClientQuotasResponseData; +import org.apache.kafka.common.message.AlterConfigsRequestData; +import org.apache.kafka.common.message.AlterConfigsResponseData; import org.apache.kafka.common.message.ControlledShutdownRequestData; import org.apache.kafka.common.message.ControlledShutdownResponseData; import org.apache.kafka.common.message.CreateAclsRequestData; @@ -108,8 +110,6 @@ import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; -import org.apache.kafka.common.requests.AlterConfigsRequest; -import org.apache.kafka.common.requests.AlterConfigsResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; @@ -186,8 +186,8 @@ public enum ApiKeys { DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS, DeleteAclsResponseData.SCHEMAS), DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(), DescribeConfigsResponse.schemaVersions()), - ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(), - AlterConfigsResponse.schemaVersions()), + ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequestData.SCHEMAS, + AlterConfigsResponseData.SCHEMAS), ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(), AlterReplicaLogDirsResponse.schemaVersions()), DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index b9fbfbdcecf..7f2f4bc3f59 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -147,7 +147,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse { case DESCRIBE_CONFIGS: return new DescribeConfigsResponse(struct); case ALTER_CONFIGS: - return new AlterConfigsResponse(struct); + return new AlterConfigsResponse(struct, version); case ALTER_REPLICA_LOG_DIRS: return new AlterReplicaLogDirsResponse(struct); case DESCRIBE_LOG_DIRS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index eecb2f90225..784d9d4595b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -18,59 +18,19 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AlterConfigsRequestData; +import org.apache.kafka.common.message.AlterConfigsResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; - -import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; -import static org.apache.kafka.common.protocol.types.Type.INT8; -import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import java.util.stream.Collectors; public class AlterConfigsRequest extends AbstractRequest { - private static final String RESOURCES_KEY_NAME = "resources"; - private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; - private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; - private static final String VALIDATE_ONLY_KEY_NAME = "validate_only"; - - private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries"; - private static final String CONFIG_NAME = "config_name"; - private static final String CONFIG_VALUE = "config_value"; - - private static final Schema CONFIG_ENTRY = new Schema( - new Field(CONFIG_NAME, STRING, "Configuration name"), - new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value")); - - private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY))); - - private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema( - new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0), - "An array of resources to update with the provided configs."), - new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN)); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_CONFIGS_REQUEST_V1 = ALTER_CONFIGS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[] {ALTER_CONFIGS_REQUEST_V0, ALTER_CONFIGS_REQUEST_V1}; - } - public static class Config { private final Collection entries; @@ -104,98 +64,76 @@ public class AlterConfigsRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { - private final Map configs; - private final boolean validateOnly; + private final AlterConfigsRequestData data = new AlterConfigsRequestData(); public Builder(Map configs, boolean validateOnly) { super(ApiKeys.ALTER_CONFIGS); - this.configs = Objects.requireNonNull(configs, "configs"); - this.validateOnly = validateOnly; + Objects.requireNonNull(configs, "configs"); + for (Map.Entry entry : configs.entrySet()) { + AlterConfigsRequestData.AlterConfigsResource resource = new AlterConfigsRequestData.AlterConfigsResource() + .setResourceName(entry.getKey().name()) + .setResourceType(entry.getKey().type().id()); + for (ConfigEntry x : entry.getValue().entries) { + resource.configs().add(new AlterConfigsRequestData.AlterableConfig() + .setName(x.name()) + .setValue(x.value())); + } + this.data.resources().add(resource); + } + this.data.setValidateOnly(validateOnly); } @Override public AlterConfigsRequest build(short version) { - return new AlterConfigsRequest(version, configs, validateOnly); + return new AlterConfigsRequest(data, version); } } - private final Map configs; - private final boolean validateOnly; + private final AlterConfigsRequestData data; - public AlterConfigsRequest(short version, Map configs, boolean validateOnly) { + public AlterConfigsRequest(AlterConfigsRequestData data, short version) { super(ApiKeys.ALTER_CONFIGS, version); - this.configs = Objects.requireNonNull(configs, "configs"); - this.validateOnly = validateOnly; + this.data = data; } public AlterConfigsRequest(Struct struct, short version) { super(ApiKeys.ALTER_CONFIGS, version); - validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME); - Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); - configs = new HashMap<>(resourcesArray.length); - for (Object resourcesObj : resourcesArray) { - Struct resourcesStruct = (Struct) resourcesObj; - - ConfigResource.Type resourceType = ConfigResource.Type.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME)); - String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME); - ConfigResource resource = new ConfigResource(resourceType, resourceName); - - Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME); - List configEntries = new ArrayList<>(configEntriesArray.length); - for (Object configEntriesObj: configEntriesArray) { - Struct configEntriesStruct = (Struct) configEntriesObj; - String configName = configEntriesStruct.getString(CONFIG_NAME); - String configValue = configEntriesStruct.getString(CONFIG_VALUE); - configEntries.add(new ConfigEntry(configName, configValue)); - } - Config config = new Config(configEntries); - configs.put(resource, config); - } + this.data = new AlterConfigsRequestData(struct, version); } public Map configs() { - return configs; + return data.resources().stream().collect(Collectors.toMap( + resource -> new ConfigResource( + ConfigResource.Type.forId(resource.resourceType()), + resource.resourceName()), + resource -> new Config(resource.configs().stream() + .map(entry -> new ConfigEntry(entry.name(), entry.value())) + .collect(Collectors.toList())))); } public boolean validateOnly() { - return validateOnly; + return data.validateOnly(); } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version())); - struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly); - List resourceStructs = new ArrayList<>(configs.size()); - for (Map.Entry entry : configs.entrySet()) { - Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); - - ConfigResource resource = entry.getKey(); - resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); - resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); - - Config config = entry.getValue(); - List configEntryStructs = new ArrayList<>(config.entries.size()); - for (ConfigEntry configEntry : config.entries) { - Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME); - configEntriesStruct.set(CONFIG_NAME, configEntry.name); - configEntriesStruct.set(CONFIG_VALUE, configEntry.value); - configEntryStructs.add(configEntriesStruct); - } - resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0])); - - resourceStructs.add(resourceStruct); - } - struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); - return struct; + return data.toStruct(version()); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError error = ApiError.fromThrowable(e); - Map errors = new HashMap<>(configs.size()); - for (ConfigResource resource : configs.keySet()) - errors.put(resource, error); - return new AlterConfigsResponse(throttleTimeMs, errors); + AlterConfigsResponseData data = new AlterConfigsResponseData() + .setThrottleTimeMs(throttleTimeMs); + for (AlterConfigsRequestData.AlterConfigsResource resource : this.data.resources()) { + data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse() + .setResourceType(resource.resourceType()) + .setResourceName(resource.resourceName()) + .setErrorMessage(error.message()) + .setErrorCode(error.error().code())); + } + return new AlterConfigsResponse(data); + } public static AlterConfigsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index bf805df5158..ba36fd42c9f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -18,105 +18,53 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AlterConfigsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; - -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.INT8; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import java.util.stream.Collectors; public class AlterConfigsResponse extends AbstractResponse { - private static final String RESOURCES_KEY_NAME = "resources"; - private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; - private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + private final AlterConfigsResponseData data; - private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( - ERROR_CODE, - ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING)); - - private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_CONFIGS_RESPONSE_V1 = ALTER_CONFIGS_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_CONFIGS_RESPONSE_V0, ALTER_CONFIGS_RESPONSE_V1}; + public AlterConfigsResponse(AlterConfigsResponseData data) { + this.data = data; } - private final int throttleTimeMs; - private final Map errors; - - public AlterConfigsResponse(int throttleTimeMs, Map errors) { - this.throttleTimeMs = throttleTimeMs; - this.errors = Objects.requireNonNull(errors, "errors"); - } - - public AlterConfigsResponse(Struct struct) { - throttleTimeMs = struct.get(THROTTLE_TIME_MS); - Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); - errors = new HashMap<>(resourcesArray.length); - for (Object resourceObj : resourcesArray) { - Struct resourceStruct = (Struct) resourceObj; - ApiError error = new ApiError(resourceStruct); - ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); - String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); - errors.put(new ConfigResource(resourceType, resourceName), error); - } + public AlterConfigsResponse(Struct struct, short version) { + this.data = new AlterConfigsResponseData(struct, version); } public Map errors() { - return errors; + return data.responses().stream().collect(Collectors.toMap( + response -> new ConfigResource( + ConfigResource.Type.forId(response.resourceType()), + response.resourceName()), + response -> new ApiError(Errors.forCode(response.errorCode()), response.errorMessage()) + )); } @Override public Map errorCounts() { - return apiErrorCounts(errors); + return apiErrorCounts(errors()); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - List resourceStructs = new ArrayList<>(errors.size()); - for (Map.Entry entry : errors.entrySet()) { - Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); - ConfigResource resource = entry.getKey(); - entry.getValue().write(resourceStruct); - resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); - resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); - resourceStructs.add(resourceStruct); - } - struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); - return struct; + return data.toStruct(version); } public static AlterConfigsResponse parse(ByteBuffer buffer, short version) { - return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer)); + return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer), version); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 9503662f0d3..7a6ce7f3aef 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.AlterConfigsResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ApiVersionsRequestData; @@ -1853,14 +1854,23 @@ public class RequestResponseTest { configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries)); configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new AlterConfigsRequest.Config(Collections.emptyList())); - return new AlterConfigsRequest((short) 0, configs, false); + return new AlterConfigsRequest.Builder(configs, false).build((short) 0); } private AlterConfigsResponse createAlterConfigsResponse() { - Map errors = new HashMap<>(); - errors.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), ApiError.NONE); - errors.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid")); - return new AlterConfigsResponse(20, errors); + AlterConfigsResponseData data = new AlterConfigsResponseData() + .setThrottleTimeMs(20); + data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse() + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + .setResourceName("0") + .setResourceType(ConfigResource.Type.BROKER.id())); + data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("This request is invalid") + .setResourceName("topic") + .setResourceType(ConfigResource.Type.TOPIC.id())); + return new AlterConfigsResponse(data); } private CreatePartitionsRequest createCreatePartitionsRequest() { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4a6082e310b..43ab2c5d9e1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -48,9 +48,10 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} +import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} +import org.apache.kafka.common.message.{AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -2404,8 +2405,19 @@ class KafkaApis(val requestChannel: RequestChannel, val unauthorizedResult = unauthorizedResources.keys.map { resource => resource -> configsAuthorizationApiError(resource) } - sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava)) + def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = { + val data = new AlterConfigsResponseData() + .setThrottleTimeMs(requestThrottleMs) + (authorizedResult ++ unauthorizedResult).foreach{case (resource, error) => + data.responses().add(new AlterConfigsResourceResponse() + .setErrorCode(error.error.code) + .setErrorMessage(error.message) + .setResourceName(resource.name) + .setResourceType(resource.`type`.id)) + } + new AlterConfigsResponse(data) + } + sendResponseMaybeThrottle(request, responseCallback) } def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = { From 371ad143a6bb973927c89c0788d048a17ebac91a Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 9 Apr 2020 15:24:44 +0100 Subject: [PATCH 3/7] KAFKA-9691: Fix NPE by waiting for reassignment request (#8317) Reviewers: Mickael Maison , Stanislav Kozlovski , Chia-Ping Tsai --- .../unit/kafka/admin/TopicCommandWithAdminClientTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 3f95038e7d9..78a4a6f3adc 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -665,7 +665,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val targetReplica = brokerIds.diff(replicasOfFirstPartition).head adminClient.alterPartitionReassignments(Collections.singletonMap(tp, - Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))) + Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get() // let's wait until the LAIR is propagated TestUtils.waitUntilTrue(() => { From 179be72e3003183b0472a888f5f2396423bb031d Mon Sep 17 00:00:00 2001 From: SoontaekLim Date: Thu, 9 Apr 2020 19:22:23 +0200 Subject: [PATCH 4/7] KAFKA-9642: Change "BigDecimal(double)" constructor to "BigDecimal.valueOf(double)" (#8212) Co-authored-by: Soontaek Lim Reviewers: Guozhang Wang --- .../api/src/main/java/org/apache/kafka/connect/data/Values.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index d99fbcabf86..d4085f8b283 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -421,7 +421,7 @@ public class Values { if (value instanceof Number) { // Not already a decimal, so treat it as a double ... double converted = ((Number) value).doubleValue(); - return new BigDecimal(converted); + return BigDecimal.valueOf(converted); } if (value instanceof String) { return new BigDecimal(value.toString()).doubleValue(); From 73ec7304b9e13cf4b9da05f215bd356a84efe0e7 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 9 Apr 2020 10:57:10 -0700 Subject: [PATCH 5/7] KAFKA-9748: Extend Streams integration tests for EOS beta (#8441) Reviewers: Boyang Chen , Guozhang Wang --- .../EOSUncleanShutdownIntegrationTest.java | 21 +++++++++++--- .../GlobalKTableEOSIntegrationTest.java | 29 ++++++++++++------- ...ableSourceTopicRestartIntegrationTest.java | 14 +++++++-- .../ResetPartitionTimeIntegrationTest.java | 25 +++++++--------- .../RocksDBMetricsIntegrationTest.java | 3 +- .../SuppressionDurabilityIntegrationTest.java | 25 +++++++--------- .../StreamThreadStateStoreProviderTest.java | 3 +- 7 files changed, 73 insertions(+), 47 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index d24eabcffb1..76c12c0acea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -32,15 +32,18 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.test.IntegrationTest; - import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import java.util.Locale; import java.util.Optional; import java.util.Properties; @@ -51,7 +54,6 @@ import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.junit.Assert.assertFalse; @@ -60,9 +62,21 @@ import static org.junit.Assert.assertFalse; /** * Test the unclean shutdown behavior around state store cleanup. */ +@RunWith(Parameterized.class) @Category(IntegrationTest.class) public class EOSUncleanShutdownIntegrationTest { + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @@ -82,8 +96,6 @@ public class EOSUncleanShutdownIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); - - STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath()); } @@ -91,6 +103,7 @@ public class EOSUncleanShutdownIntegrationTest { public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test"; STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); final String input = "input-topic"; cleanStateBeforeTest(CLUSTER, input); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 6dec94b373e..a87bbd3c132 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -48,14 +48,18 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +@RunWith(Parameterized.class) @Category({IntegrationTest.class}) public class GlobalKTableEOSIntegrationTest { private static final int NUM_BROKERS = 1; @@ -70,6 +74,17 @@ public class GlobalKTableEOSIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + private static volatile AtomicInteger testNo = new AtomicInteger(0); private final MockTime mockTime = CLUSTER.time; private final KeyValueMapper keyMapper = (key, value) -> value; @@ -97,7 +112,7 @@ public class GlobalKTableEOSIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) .withKeySerde(Serdes.Long()) @@ -319,15 +334,9 @@ public class GlobalKTableEOSIntegrationTest { } private void produceInitialGlobalTableValues() throws Exception { - produceInitialGlobalTableValues(true); - } - - private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { final Properties properties = new Properties(); - if (enableTransactions) { - properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); - properties.put(ProducerConfig.RETRIES_CONFIG, 1); - } + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + properties.put(ProducerConfig.RETRIES_CONFIG, 1); IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( @@ -342,7 +351,7 @@ public class GlobalKTableEOSIntegrationTest { StringSerializer.class, properties), mockTime, - enableTransactions); + true); } private void produceGlobalTableValues() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 3ec239fab91..693a7182ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -130,9 +130,19 @@ public class KTableSourceTopicRestartIntegrationTest { } @Test - public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception { + public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(); + } + + @Test + public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosBetaEnabled() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); + shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(); + } + + private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception { try { - STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); streamsOne.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index 955106c789f..8aa39354095 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -40,8 +40,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -51,8 +51,6 @@ import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; @@ -77,20 +75,19 @@ public class ResetPartitionTimeIntegrationTest { private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Serde STRING_SERDE = Serdes.String(); private static final int DEFAULT_TIMEOUT = 100; - private final boolean eosEnabled; private static long lastRecordedTimestamp = -2L; - @Parameters(name = "{index}: eosEnabled={0}") - public static Collection parameters() { - return asList( - new Object[] {false}, - new Object[] {true} - ); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.AT_LEAST_ONCE}, + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); } - public ResetPartitionTimeIntegrationTest(final boolean eosEnabled) { - this.eosEnabled = eosEnabled; - } + @Parameterized.Parameter + public String processingGuarantee; @Test public void shouldPreservePartitionTimeOnKafkaStreamRestart() { @@ -112,7 +109,7 @@ public class ResetPartitionTimeIntegrationTest { streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); - streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE); + streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); KafkaStreams kafkaStreams = getStartedStreams(streamsConfig, builder, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 34cc428bb95..0849bdd68b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -101,8 +101,9 @@ public class RocksDBMetricsIntegrationTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { + {StreamsConfig.AT_LEAST_ONCE}, {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.AT_LEAST_ONCE} + {StreamsConfig.EXACTLY_ONCE_BETA} }); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index a95b4e9a3ad..beb9ec7fe60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -49,10 +49,10 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -69,8 +69,6 @@ import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; @@ -94,19 +92,18 @@ public class SuppressionDurabilityIntegrationTest { private static final Serde STRING_SERDE = Serdes.String(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final int COMMIT_INTERVAL = 100; - private final boolean eosEnabled; - @Parameters(name = "{index}: eosEnabled={0}") - public static Collection parameters() { - return asList( - new Object[] {false}, - new Object[] {true} - ); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.AT_LEAST_ONCE}, + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); } - public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) { - this.eosEnabled = eosEnabled; - } + @Parameterized.Parameter + public String processingGuaranteee; @Test public void shouldRecoverBufferAfterShutdown() { @@ -153,7 +150,7 @@ public class SuppressionDurabilityIntegrationTest { mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index da8f434c21d..7aad8d6d6d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -70,7 +70,6 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -361,7 +360,7 @@ public class StreamThreadStateStoreProviderTest { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.ACTIVE, - EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + StreamThread.eosEnabled(streamsConfig), logContext, stateDirectory, new StoreChangelogReader( From bf6dffe93bbe0fe33ad076ebccebb840d66b936d Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 9 Apr 2020 13:11:36 -0700 Subject: [PATCH 6/7] KAFKA-9309: Add the ability to translate Message classes to and from JSON (#7844) Reviewers: David Arthur , Ron Dagostino --- checkstyle/import-control.xml | 5 + .../apache/kafka/common/protocol/Message.java | 48 ++- .../kafka/common/protocol/MessageUtil.java | 103 +++++ .../kafka/common/message/MessageTest.java | 11 + .../org/apache/kafka/message/FieldSpec.java | 5 + .../org/apache/kafka/message/FieldType.java | 12 + .../kafka/message/IsNullConditional.java | 47 ++- .../kafka/message/MessageDataGenerator.java | 398 ++++++++++++++++-- .../kafka/message/MessageGenerator.java | 24 ++ .../java/org/apache/kafka/message/Target.java | 93 ++++ .../kafka/message/IsNullConditionalTest.java | 8 +- 11 files changed, 699 insertions(+), 55 deletions(-) create mode 100644 generator/src/main/java/org/apache/kafka/message/Target.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3e41e9778f5..3fd5ea7f141 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -123,6 +123,7 @@ + @@ -243,6 +244,10 @@ + + + + diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java index b31593f1e6f..149fe11cfe2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.protocol; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.protocol.types.RawTaggedField; import org.apache.kafka.common.protocol.types.Struct; @@ -77,11 +79,15 @@ public interface Message { void read(Readable readable, short version); /** - * Reads this message from the a Struct object. This will overwrite all + * Reads this message from a Struct object. This will overwrite all * relevant fields with information from the Struct. * * @param struct The source struct. * @param version The version to use. + * + * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException} + * If the specified struct can't be processed with the + * specified message version. */ void fromStruct(Struct struct, short version); @@ -96,6 +102,46 @@ public interface Message { */ Struct toStruct(short version); + /** + * Reads this message from a Jackson JsonNode object. This will overwrite + * all relevant fields with information from the Struct. + * + * For the most part, we expect every JSON object in the input to be the + * correct type. There is one exception: we will deserialize numbers + * represented as strings. If the numeric string begins with 0x, we will + * treat the number as hexadecimal. + * + * Note that we expect to see NullNode objects created for null entries. + * Therefore, please configure your Jackson ObjectMapper with + * setSerializationInclusion({@link JsonInclude.Include#ALWAYS}). + * Other settings may silently omit the nulls, which is not the + * semantic that Kafka RPC uses. (Including a field and setting it to + * null is different than not including the field.) + * + * @param node The source node. + * @param version The version to use. + * + * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException} + * If the specified JSON can't be processed with the + * specified message version. + */ + void fromJson(JsonNode node, short version); + + /** + * Convert this message to a JsonNode. + * + * Note that 64-bit numbers will be serialized as strings rather than as integers. + * The reason is because JavaScript can't represent numbers above 2**52 accurately. + * Therefore, for maximum interoperability, we represent these numbers as strings. + * + * @param version The version to use. + * + * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException} + * If the specified version is too new to be supported + * by this software. + */ + JsonNode toJson(short version); + /** * Returns a list of tagged fields which this software can't understand. * diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java index 6e473e780f3..1f09c53c471 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.protocol; +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.UUID; @@ -51,4 +54,104 @@ public final class MessageUtil { bld.append("]"); return bld.toString(); } + + public static byte jsonNodeToByte(JsonNode node, String about) { + int value = jsonNodeToInt(node, about); + if (value > Byte.MAX_VALUE) { + if (value <= 256) { + // It's more traditional to refer to bytes as unsigned, + // so we support that here. + value -= 128; + } else { + throw new RuntimeException(about + ": value " + value + + " does not fit in an 8-bit signed integer."); + } + } + if (value < Byte.MIN_VALUE) { + throw new RuntimeException(about + ": value " + value + + " does not fit in an 8-bit signed integer."); + } + return (byte) value; + } + + public static short jsonNodeToShort(JsonNode node, String about) { + int value = jsonNodeToInt(node, about); + if ((value < Short.MIN_VALUE) || (value > Short.MAX_VALUE)) { + throw new RuntimeException(about + ": value " + value + + " does not fit in a 16-bit signed integer."); + } + return (short) value; + } + + public static int jsonNodeToInt(JsonNode node, String about) { + if (node.isInt()) { + return node.asInt(); + } + if (node.isTextual()) { + throw new NumberFormatException(about + ": expected an integer or " + + "string type, but got " + node.getNodeType()); + } + String text = node.asText(); + if (text.startsWith("0x")) { + try { + return Integer.parseInt(text.substring(2), 16); + } catch (NumberFormatException e) { + throw new NumberFormatException(about + ": failed to " + + "parse hexadecimal number: " + e.getMessage()); + } + } else { + try { + return Integer.parseInt(text); + } catch (NumberFormatException e) { + throw new NumberFormatException(about + ": failed to " + + "parse number: " + e.getMessage()); + } + } + } + + public static long jsonNodeToLong(JsonNode node, String about) { + if (node.isLong()) { + return node.asLong(); + } + if (node.isTextual()) { + throw new NumberFormatException(about + ": expected an integer or " + + "string type, but got " + node.getNodeType()); + } + String text = node.asText(); + if (text.startsWith("0x")) { + try { + return Long.parseLong(text.substring(2), 16); + } catch (NumberFormatException e) { + throw new NumberFormatException(about + ": failed to " + + "parse hexadecimal number: " + e.getMessage()); + } + } else { + try { + return Long.parseLong(text); + } catch (NumberFormatException e) { + throw new NumberFormatException(about + ": failed to " + + "parse number: " + e.getMessage()); + } + } + } + + public static byte[] jsonNodeToBinary(JsonNode node, String about) { + if (!node.isBinary()) { + throw new RuntimeException(about + ": expected Base64-encoded binary data."); + } + try { + byte[] value = node.binaryValue(); + return value; + } catch (IOException e) { + throw new RuntimeException(about + ": unable to retrieve Base64-encoded binary data", e); + } + } + + public static double jsonNodeToDouble(JsonNode node, String about) { + if (!node.isFloatingPointNumber()) { + throw new NumberFormatException(about + ": expected a floating point " + + "type, but got " + node.getNodeType()); + } + return node.asDouble(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 8100d0c28f6..8599fb6ebda 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.message; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; @@ -638,6 +639,7 @@ public final class MessageTest { private void testEquivalentMessageRoundTrip(short version, Message message) throws Exception { testStructRoundTrip(version, message, message); testByteBufferRoundTrip(version, message, message); + testJsonRoundTrip(version, message, message); } private void testByteBufferRoundTrip(short version, Message message, Message expected) throws Exception { @@ -668,6 +670,15 @@ public final class MessageTest { assertEquals(expected.toString(), message2.toString()); } + private void testJsonRoundTrip(short version, Message message, Message expected) throws Exception { + JsonNode jsonNode = message.toJson(version); + Message message2 = message.getClass().newInstance(); + message2.fromJson(jsonNode, version); + assertEquals(expected, message2); + assertEquals(expected.hashCode(), message2.hashCode()); + assertEquals(expected.toString(), message2.toString()); + } + /** * Verify that the JSON files support the same message versions as the * schemas accessible through the ApiKey class. diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java index f8028caab5a..b3bf5bfbb13 100644 --- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java @@ -227,6 +227,11 @@ public final class FieldSpec { return ignorable; } + @JsonProperty("entityType") + public EntityType entityType() { + return entityType; + } + @JsonProperty("about") public String about() { return about; diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java index 0b4880d9940..af5a5a70809 100644 --- a/generator/src/main/java/org/apache/kafka/message/FieldType.java +++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java @@ -121,6 +121,11 @@ public interface FieldType { return Optional.of(8); } + @Override + public boolean isFloat() { + return true; + } + @Override public String toString() { return NAME; @@ -318,6 +323,13 @@ public interface FieldType { return false; } + /** + * Returns true if this is a floating point type. + */ + default boolean isFloat() { + return false; + } + /** * Returns true if this is a struct type. */ diff --git a/generator/src/main/java/org/apache/kafka/message/IsNullConditional.java b/generator/src/main/java/org/apache/kafka/message/IsNullConditional.java index 5ff51dacc66..88a3598fbe1 100644 --- a/generator/src/main/java/org/apache/kafka/message/IsNullConditional.java +++ b/generator/src/main/java/org/apache/kafka/message/IsNullConditional.java @@ -18,9 +18,26 @@ package org.apache.kafka.message; /** - * Creates an if statement based on whether or not a particular field is null. + * For versions of a field that are nullable, IsNullCondition creates a null check. */ public final class IsNullConditional { + interface ConditionalGenerator { + String generate(String name, boolean negated); + } + + private static class PrimitiveConditionalGenerator implements ConditionalGenerator { + final static PrimitiveConditionalGenerator INSTANCE = new PrimitiveConditionalGenerator(); + + @Override + public String generate(String name, boolean negated) { + if (negated) { + return String.format("%s != null", name); + } else { + return String.format("%s == null", name); + } + } + } + static IsNullConditional forName(String name) { return new IsNullConditional(name); } @@ -35,8 +52,9 @@ public final class IsNullConditional { private Versions nullableVersions = Versions.ALL; private Versions possibleVersions = Versions.ALL; private Runnable ifNull = null; - private Runnable ifNotNull = null; + private Runnable ifShouldNotBeNull = null; private boolean alwaysEmitBlockScope = false; + private ConditionalGenerator conditionalGenerator = PrimitiveConditionalGenerator.INSTANCE; private IsNullConditional(String name) { this.name = name; @@ -57,8 +75,8 @@ public final class IsNullConditional { return this; } - IsNullConditional ifNotNull(Runnable ifNotNull) { - this.ifNotNull = ifNotNull; + IsNullConditional ifShouldNotBeNull(Runnable ifShouldNotBeNull) { + this.ifShouldNotBeNull = ifShouldNotBeNull; return this; } @@ -67,14 +85,19 @@ public final class IsNullConditional { return this; } + IsNullConditional conditionalGenerator(ConditionalGenerator conditionalGenerator) { + this.conditionalGenerator = conditionalGenerator; + return this; + } + void generate(CodeBuffer buffer) { if (nullableVersions.intersect(possibleVersions).empty()) { - if (ifNotNull != null) { + if (ifShouldNotBeNull != null) { if (alwaysEmitBlockScope) { buffer.printf("{%n"); buffer.incrementIndent(); } - ifNotNull.run(); + ifShouldNotBeNull.run(); if (alwaysEmitBlockScope) { buffer.decrementIndent(); buffer.printf("}%n"); @@ -82,21 +105,21 @@ public final class IsNullConditional { } } else { if (ifNull != null) { - buffer.printf("if (%s == null) {%n", name); + buffer.printf("if (%s) {%n", conditionalGenerator.generate(name, false)); buffer.incrementIndent(); ifNull.run(); buffer.decrementIndent(); - if (ifNotNull != null) { + if (ifShouldNotBeNull != null) { buffer.printf("} else {%n"); buffer.incrementIndent(); - ifNotNull.run(); + ifShouldNotBeNull.run(); buffer.decrementIndent(); } buffer.printf("}%n"); - } else if (ifNotNull != null) { - buffer.printf("if (%s != null) {%n", name); + } else if (ifShouldNotBeNull != null) { + buffer.printf("if (%s) {%n", conditionalGenerator.generate(name, true)); buffer.incrementIndent(); - ifNotNull.run(); + ifShouldNotBeNull.run(); buffer.decrementIndent(); buffer.printf("}%n"); } diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index e1bd59aeea1..9e9ae40ed13 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -98,6 +98,10 @@ public final class MessageDataGenerator { buffer.printf("%n"); generateClassToStruct(className, struct, parentVersions); buffer.printf("%n"); + generateClassFromJson(className, struct, parentVersions); + buffer.printf("%n"); + generateClassToJson(className, struct, parentVersions); + buffer.printf("%n"); generateClassSize(className, struct, parentVersions); buffer.printf("%n"); generateClassEquals(className, struct, isSetElement); @@ -392,39 +396,46 @@ public final class MessageDataGenerator { buffer.printf("public %s(Readable _readable, short _version) {%n", className); buffer.incrementIndent(); buffer.printf("read(_readable, _version);%n"); - if (isSetElement) { - headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS); - buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); - buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); - } + generateConstructorEpilogue(isSetElement); buffer.decrementIndent(); buffer.printf("}%n"); buffer.printf("%n"); + headerGenerator.addImport(MessageGenerator.STRUCT_CLASS); - buffer.printf("public %s(Struct struct, short _version) {%n", className); + buffer.printf("public %s(Struct _struct, short _version) {%n", className); buffer.incrementIndent(); - buffer.printf("fromStruct(struct, _version);%n"); - if (isSetElement) { - headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS); - buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); - buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); - } + buffer.printf("fromStruct(_struct, _version);%n"); + generateConstructorEpilogue(isSetElement); buffer.decrementIndent(); buffer.printf("}%n"); buffer.printf("%n"); + + headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS); + buffer.printf("public %s(JsonNode _node, short _version) {%n", className); + buffer.incrementIndent(); + buffer.printf("fromJson(_node, _version);%n"); + generateConstructorEpilogue(isSetElement); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.printf("%n"); + buffer.printf("public %s() {%n", className); buffer.incrementIndent(); for (FieldSpec field : struct.fields()) { buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field)); } + generateConstructorEpilogue(isSetElement); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateConstructorEpilogue(boolean isSetElement) { if (isSetElement) { headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS); buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n"); } - buffer.decrementIndent(); - buffer.printf("}%n"); } private void generateShortAccessor(String name, short val) { @@ -782,6 +793,315 @@ public final class MessageDataGenerator { buffer.printf("}%n"); } + private void generateClassFromJson(String className, StructSpec struct, + Versions parentVersions) { + headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS); + buffer.printf("@Override%n"); + buffer.printf("public void fromJson(JsonNode _node, short _version) {%n"); + buffer.incrementIndent(); + VersionConditional.forVersions(struct.versions(), parentVersions). + allowMembershipCheckAlwaysFalse(false). + ifNotMember(__ -> { + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Can't read " + + "version \" + _version + \" of %s\");%n", className); + }). + generate(buffer); + Versions curVersions = parentVersions.intersect(struct.versions()); + for (FieldSpec field : struct.fields()) { + String sourceVariable = String.format("_%sNode", field.camelCaseName()); + buffer.printf("JsonNode %s = _node.get(\"%s\");%n", + sourceVariable, + field.camelCaseName()); + buffer.printf("if (%s == null) {%n", sourceVariable); + buffer.incrementIndent(); + Versions mandatoryVersions = field.versions().subtract(field.taggedVersions()); + VersionConditional.forVersions(mandatoryVersions, curVersions). + ifMember(__ -> { + buffer.printf("throw new RuntimeException(\"%s: unable to locate " + + "field \'%s\', which is mandatory in version \" + _version);%n", + className, field.camelCaseName()); + }). + ifNotMember(__ -> { + buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field)); + }). + generate(buffer); + buffer.decrementIndent(); + buffer.printf("} else {%n"); + buffer.incrementIndent(); + VersionConditional.forVersions(struct.versions(), curVersions). + ifMember(presentVersions -> { + generateTargetFromJson(new Target(field, + sourceVariable, + className, + input -> String.format("this.%s = %s", field.camelCaseName(), input)), + curVersions); + }).ifNotMember(__ -> { + buffer.printf("throw new RuntimeException(\"%s: field \'%s\' is not " + + "supported in version \" + _version);%n", + className, field.camelCaseName()); + }). + generate(buffer); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateTargetFromJson(Target target, Versions curVersions) { + if (target.field().type() instanceof FieldType.BoolFieldType) { + buffer.printf("if (!%s.isBoolean()) {%n", target.sourceVariable()); + buffer.incrementIndent(); + buffer.printf("throw new RuntimeException(\"%s expected Boolean type, " + + "but got \" + _node.getNodeType());%n", target.humanReadableName()); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.printf("%s;%n", target.assignmentStatement( + target.sourceVariable() + ".asBoolean()")); + } else if (target.field().type() instanceof FieldType.Int8FieldType) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToByte(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } else if (target.field().type() instanceof FieldType.Int16FieldType) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToShort(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } else if (target.field().type() instanceof FieldType.Int32FieldType) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToInt(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } else if (target.field().type() instanceof FieldType.Int64FieldType) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToLong(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } else if (target.field().type() instanceof FieldType.UUIDFieldType) { + buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable()); + buffer.incrementIndent(); + buffer.printf("throw new RuntimeException(\"%s expected a JSON string " + + "type, but got \" + _node.getNodeType());%n", target.humanReadableName()); + buffer.decrementIndent(); + buffer.printf("}%n"); + headerGenerator.addImport(MessageGenerator.UUID_CLASS); + buffer.printf("%s;%n", target.assignmentStatement(String.format( + "UUID.fromString(%s.asText())", target.sourceVariable()))); + } else if (target.field().type() instanceof FieldType.Float64FieldType) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToDouble(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } else { + // Handle the variable length types. All of them are potentially + // nullable, so handle that here. + IsNullConditional.forName(target.sourceVariable()). + nullableVersions(target.field().nullableVersions()). + possibleVersions(curVersions). + conditionalGenerator((name, negated) -> + String.format("%s%s.isNull()", negated ? "!" : "", name)). + ifNull(() -> { + buffer.printf("%s;%n", target.assignmentStatement("null")); + }). + ifShouldNotBeNull(() -> { + generateVariableLengthTargetFromJson(target, curVersions); + }). + generate(buffer); + } + } + + private void generateVariableLengthTargetFromJson(Target target, Versions curVersions) { + if (target.field().type().isString()) { + buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable()); + buffer.incrementIndent(); + buffer.printf("throw new RuntimeException(\"%s expected a string " + + "type, but got \" + _node.getNodeType());%n", target.humanReadableName()); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("%s.asText()", target.sourceVariable()))); + } else if (target.field().type().isBytes()) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + if (target.field().zeroCopy()) { + headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\"))", + target.sourceVariable(), target.humanReadableName()))); + } else { + buffer.printf("%s;%n", target.assignmentStatement( + String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")", + target.sourceVariable(), target.humanReadableName()))); + } + } else if (target.field().type().isArray()) { + buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable()); + buffer.incrementIndent(); + buffer.printf("throw new RuntimeException(\"%s expected a JSON " + + "array, but got \" + _node.getNodeType());%n", target.humanReadableName()); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new %s()", fieldConcreteJavaType(target.field())))); + headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS); + buffer.printf("for (JsonNode _element : %s) {%n", target.sourceVariable()); + buffer.incrementIndent(); + generateTargetFromJson(target.arrayElementTarget( + input -> String.format("%s.add(%s)", target.field().camelCaseName(), input)), + curVersions); + buffer.decrementIndent(); + buffer.printf("}%n"); + } else if (target.field().type().isStruct()) { + buffer.printf("%s;%n", target.assignmentStatement(String.format("new %s(%s, _version)", + target.field().type().toString(), + target.sourceVariable()))); + } else { + throw new RuntimeException("Unexpected type " + target.field().type()); + } + } + + private void generateClassToJson(String className, StructSpec struct, + Versions parentVersions) { + headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS); + buffer.printf("@Override%n"); + buffer.printf("public JsonNode toJson(short _version) {%n"); + buffer.incrementIndent(); + VersionConditional.forVersions(struct.versions(), parentVersions). + allowMembershipCheckAlwaysFalse(false). + ifNotMember(__ -> { + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Can't write " + + "version \" + _version + \" of %s\");%n", className); + }). + generate(buffer); + Versions curVersions = parentVersions.intersect(struct.versions()); + headerGenerator.addImport(MessageGenerator.OBJECT_NODE_CLASS); + headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS); + buffer.printf("ObjectNode _node = new ObjectNode(JsonNodeFactory.instance);%n"); + for (FieldSpec field : struct.fields()) { + Target target = new Target(field, + String.format("this.%s", field.camelCaseName()), + field.camelCaseName(), + input -> String.format("_node.set(\"%s\", %s)", field.camelCaseName(), input)); + VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions). + ifMember(presentVersions -> { + VersionConditional.forVersions(field.taggedVersions(), presentVersions). + ifMember(presentAndTaggedVersions -> { + generateNonDefaultValueCheck(field, field.nullableVersions()); + buffer.incrementIndent(); + if (field.defaultString().equals("null")) { + // If the default was null, and we already checked that this field was not + // the default, we can omit further null checks. + generateTargetToJson(target.nonNullableCopy(), presentAndTaggedVersions); + } else { + generateTargetToJson(target, presentAndTaggedVersions); + } + buffer.decrementIndent(); + buffer.printf("}%n"); + }). + ifNotMember(presentAndNotTaggedVersions -> { + generateTargetToJson(target, presentAndNotTaggedVersions); + }). + generate(buffer); + }); + if (!field.ignorable()) { + cond.ifNotMember(__ -> { + generateNonIgnorableFieldCheck(field); + }); + } + cond.generate(buffer); + } + buffer.printf("return _node;%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateTargetToJson(Target target, Versions versions) { + if (target.field().type() instanceof FieldType.BoolFieldType) { + headerGenerator.addImport(MessageGenerator.BOOLEAN_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("BooleanNode.valueOf(%s)", target.sourceVariable()))); + } else if ((target.field().type() instanceof FieldType.Int8FieldType) || + (target.field().type() instanceof FieldType.Int16FieldType)) { + headerGenerator.addImport(MessageGenerator.SHORT_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new ShortNode(%s)", target.sourceVariable()))); + } else if (target.field().type() instanceof FieldType.Int32FieldType) { + headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new IntNode(%s)", target.sourceVariable()))); + } else if (target.field().type() instanceof FieldType.Int64FieldType) { + headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new LongNode(%s)", target.sourceVariable()))); + } else if (target.field().type() instanceof FieldType.UUIDFieldType) { + headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new TextNode(%s.toString())", target.sourceVariable()))); + } else if (target.field().type() instanceof FieldType.Float64FieldType) { + headerGenerator.addImport(MessageGenerator.DOUBLE_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new DoubleNode(%s)", target.sourceVariable()))); + } else { + // Handle the variable length types. All of them are potentially + // nullable, so handle that here. + IsNullConditional.forName(target.sourceVariable()). + nullableVersions(target.field().nullableVersions()). + possibleVersions(versions). + conditionalGenerator((name, negated) -> + String.format("%s %s= null", name, negated ? "!" : "=")). + ifNull(() -> { + headerGenerator.addImport(MessageGenerator.NULL_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement("NullNode.instance")); + }). + ifShouldNotBeNull(() -> { + generateVariableLengthTargetToJson(target, versions); + }). + generate(buffer); + } + } + + private void generateVariableLengthTargetToJson(Target target, Versions versions) { + if (target.field().type().isString()) { + headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new TextNode(%s)", target.sourceVariable()))); + } else if (target.field().type().isBytes()) { + headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); + if (target.field().zeroCopy()) { + headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new BinaryNode(MessageUtil.byteBufferToArray(%s))", + target.sourceVariable()))); + } else { + headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS); + buffer.printf("%s;%n", target.assignmentStatement( + String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))", + target.sourceVariable(), target.sourceVariable()))); + } + } else if (target.field().type().isArray()) { + headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS); + headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS); + FieldType.ArrayType arrayType = (FieldType.ArrayType) target.field().type(); + FieldType elementType = arrayType.elementType(); + String arrayInstanceName = String.format("_%sArray", target.field().camelCaseName()); + buffer.printf("ArrayNode %s = new ArrayNode(JsonNodeFactory.instance);%n", arrayInstanceName); + buffer.printf("for (%s _element : %s) {%n", + getBoxedJavaType(elementType), target.sourceVariable()); + buffer.incrementIndent(); + generateTargetToJson(target.arrayElementTarget( + input -> String.format("%s.add(%s)", arrayInstanceName, input)), + versions); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.printf("%s;%n", target.assignmentStatement(arrayInstanceName)); + } else if (target.field().type().isStruct()) { + buffer.printf("%s;%n", target.assignmentStatement( + String.format("%s.toJson(_version)", target.sourceVariable()))); + } else { + throw new RuntimeException("unknown type " + target.field().type()); + } + } private void generateArrayFromStruct(FieldSpec field, Versions versions) { IsNullConditional.forName("_nestedObjects"). @@ -790,7 +1110,7 @@ public final class MessageDataGenerator { ifNull(() -> { buffer.printf("this.%s = null;%n", field.camelCaseName()); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type(); FieldType elementType = arrayType.elementType(); buffer.printf("this.%s = new %s(_nestedObjects.length);%n", @@ -866,19 +1186,15 @@ public final class MessageDataGenerator { } } - private void maybeGenerateNonIgnorableFieldCheck(FieldSpec field, VersionConditional cond) { - if (!field.ignorable()) { - cond.ifNotMember(__ -> { - generateNonDefaultValueCheck(field, field.nullableVersions()); - buffer.incrementIndent(); - headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); - buffer.printf("throw new UnsupportedVersionException(" + - "\"Attempted to write a non-default %s at version \" + _version);%n", - field.camelCaseName()); - buffer.decrementIndent(); - buffer.printf("}%n"); - }); - } + private void generateNonIgnorableFieldCheck(FieldSpec field) { + generateNonDefaultValueCheck(field, field.nullableVersions()); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(" + + "\"Attempted to write a non-default %s at version \" + _version);%n", + field.camelCaseName()); + buffer.decrementIndent(); + buffer.printf("}%n"); } private void generateClassWriter(String className, StructSpec struct, @@ -888,7 +1204,7 @@ public final class MessageDataGenerator { buffer.printf("@Override%n"); buffer.printf("public void write(Writable _writable, ObjectSerializationCache _cache, short _version) {%n"); buffer.incrementIndent(); - VersionConditional.forVersions(parentVersions, struct.versions()). + VersionConditional.forVersions(struct.versions(), parentVersions). allowMembershipCheckAlwaysFalse(false). ifNotMember(__ -> { headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); @@ -946,8 +1262,11 @@ public final class MessageDataGenerator { }). generate(buffer); }); - - maybeGenerateNonIgnorableFieldCheck(field, cond); + if (!field.ignorable()) { + cond.ifNotMember(__ -> { + generateNonIgnorableFieldCheck(field); + }); + } cond.generate(buffer); } headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS); @@ -972,7 +1291,7 @@ public final class MessageDataGenerator { nullableVersions(field.nullableVersions()). possibleVersions(presentAndTaggedVersions). alwaysEmitBlockScope(true). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { if (!field.defaultString().equals("null")) { generateNonDefaultValueCheck(field, Versions.NONE); buffer.incrementIndent(); @@ -1100,7 +1419,7 @@ public final class MessageDataGenerator { }). generate(buffer); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { final String lengthExpression; if (type.isString()) { buffer.printf("byte[] _stringBytes = _cache.getSerializedValue(%s);%n", @@ -1262,8 +1581,11 @@ public final class MessageDataGenerator { }). generate(buffer); }); - - maybeGenerateNonIgnorableFieldCheck(field, cond); + if (!field.ignorable()) { + cond.ifNotMember(__ -> { + generateNonIgnorableFieldCheck(field); + }); + } cond.generate(buffer); } VersionConditional.forVersions(messageFlexibleVersions, curVersions). @@ -1306,7 +1628,7 @@ public final class MessageDataGenerator { ifNull(() -> { buffer.printf("struct.set(\"%s\", null);%n", field.snakeCaseName()); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { generateFieldToObjectArray(field); buffer.printf("struct.set(\"%s\", (Object[]) _nestedObjects);%n", field.snakeCaseName()); @@ -1347,7 +1669,7 @@ public final class MessageDataGenerator { ifNull(() -> { buffer.printf("_taggedFields.put(%d, null);%n", field.tag().get()); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { generateFieldToObjectArray(field); buffer.printf("_taggedFields.put(%d, _nestedObjects);%n", field.tag().get()); }).generate(buffer); @@ -1535,7 +1857,7 @@ public final class MessageDataGenerator { generate(buffer); } }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { if (tagged) { if (!field.defaultString().equals("null")) { generateNonDefaultValueCheck(field, Versions.NONE); diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java index 275b2c88c00..f25a0e8e9ac 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java @@ -112,6 +112,30 @@ public final class MessageGenerator { static final String MAP_ENTRY_CLASS = "java.util.Map.Entry"; + static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode"; + + static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode"; + + static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory"; + + static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode"; + + static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode"; + + static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode"; + + static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode"; + + static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode"; + + static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode"; + + static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode"; + + static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode"; + + static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode"; + /** * The Jackson serializer we use for JSON objects. */ diff --git a/generator/src/main/java/org/apache/kafka/message/Target.java b/generator/src/main/java/org/apache/kafka/message/Target.java new file mode 100644 index 00000000000..a43cf2c3e9f --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/Target.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message; + +import java.util.Collections; +import java.util.function.Function; + +public final class Target { + private final FieldSpec field; + private final String sourceVariable; + private final String humanReadableName; + private final Function assignmentStatementGenerator; + + Target(FieldSpec field, String sourceVariable, String humanReadableName, + Function assignmentStatementGenerator) { + this.field = field; + this.sourceVariable = sourceVariable; + this.humanReadableName = humanReadableName; + this.assignmentStatementGenerator = assignmentStatementGenerator; + } + + public String assignmentStatement(String rightHandSide) { + return assignmentStatementGenerator.apply(rightHandSide); + } + + public Target nonNullableCopy() { + FieldSpec nonNullableField = new FieldSpec(field.name(), + field.versionsString(), + field.fields(), + field.typeString(), + field.mapKey(), + Versions.NONE.toString(), + field.defaultString(), + field.ignorable(), + field.entityType(), + field.about(), + field.taggedVersionsString(), + field.flexibleVersionsString(), + field.tagInteger(), + field.zeroCopy()); + return new Target(nonNullableField, sourceVariable, humanReadableName, assignmentStatementGenerator); + } + + public Target arrayElementTarget(Function assignmentStatementGenerator) { + if (!field.type().isArray()) { + throw new RuntimeException("Field " + field + " is not an array."); + } + FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type(); + FieldSpec elementField = new FieldSpec(field.name() + "Element", + field.versions().toString(), + Collections.emptyList(), + arrayType.elementType().toString(), + false, + Versions.NONE.toString(), + "", + false, + EntityType.UNKNOWN, + "", + Versions.NONE.toString(), + field.flexibleVersionsString(), + null, + field.zeroCopy()); + return new Target(elementField, "_element", humanReadableName + " element", + assignmentStatementGenerator); + } + + public FieldSpec field() { + return field; + } + + public String sourceVariable() { + return sourceVariable; + } + + public String humanReadableName() { + return humanReadableName; + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/IsNullConditionalTest.java b/generator/src/test/java/org/apache/kafka/message/IsNullConditionalTest.java index a8f7bc68c99..18dd111ea3a 100644 --- a/generator/src/test/java/org/apache/kafka/message/IsNullConditionalTest.java +++ b/generator/src/test/java/org/apache/kafka/message/IsNullConditionalTest.java @@ -52,7 +52,7 @@ public class IsNullConditionalTest { ifNull(() -> { buffer.printf("System.out.println(\"null\");%n"); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { buffer.printf("System.out.println(\"not null\");%n"); }). generate(buffer); @@ -71,7 +71,7 @@ public class IsNullConditionalTest { forName("foobar"). nullableVersions(Versions.parse("0+", null)). possibleVersions(Versions.parse("2+", null)). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { buffer.printf("System.out.println(\"not null\");%n"); }). generate(buffer); @@ -91,7 +91,7 @@ public class IsNullConditionalTest { ifNull(() -> { buffer.printf("System.out.println(\"null\");%n"); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { buffer.printf("System.out.println(\"not null\");%n"); }). generate(buffer); @@ -109,7 +109,7 @@ public class IsNullConditionalTest { ifNull(() -> { buffer.printf("System.out.println(\"null\");%n"); }). - ifNotNull(() -> { + ifShouldNotBeNull(() -> { buffer.printf("System.out.println(\"not null\");%n"); }). alwaysEmitBlockScope(true). From 0470e2bc9585acb64eb19a64c4099e495890d355 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 9 Apr 2020 15:28:44 -0700 Subject: [PATCH 7/7] KAFKA-6145: KIP-441: fix flaky shouldEnforceRebalance test in StreamThreadTest (#8452) Reviewers: Boyang Chen , John Roesler --- .../kafka/streams/processor/internals/StreamThreadTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index dbd43488bfe..d0488f62d5b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -508,13 +508,16 @@ public class StreamThreadTest { 10 * 1000, "Thread never started."); + TestUtils.retryOnExceptionWithTimeout( + () -> EasyMock.verify(mockConsumer) + ); + thread.shutdown(); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, 10 * 1000, "Thread never shut down."); - EasyMock.verify(mockConsumer); } private static class EasyMockConsumerClientSupplier extends MockClientSupplier {