Merge branch 'trunk' into temp-8436

This commit is contained in:
John Roesler 2020-04-09 17:32:32 -05:00
commit 872694be4e
29 changed files with 874 additions and 293 deletions

View File

@ -123,6 +123,7 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="record">
@ -243,6 +244,10 @@
<allow pkg="org.apache.kafka.connect.json" />
</subpackage>
<subpackage name="internals">
<allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="perf">
<allow pkg="com.fasterxml.jackson.databind" />
</subpackage>

View File

@ -796,7 +796,7 @@ public class Fetcher<K, V> implements Closeable {
subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate);
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {

View File

@ -96,7 +96,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
partitionsToRetry.add(topicPartition);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
logger().warn("Received unknown topic or partition error in ListOffset request for partition {}",
logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}",
topicPartition);
partitionsToRetry.add(topicPartition);
break;

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.AlterClientQuotasRequestData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateAclsRequestData;
@ -108,8 +110,6 @@ import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
@ -186,8 +186,8 @@ public enum ApiKeys {
DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS, DeleteAclsResponseData.SCHEMAS),
DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(),
DescribeConfigsResponse.schemaVersions()),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
AlterConfigsResponse.schemaVersions()),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequestData.SCHEMAS,
AlterConfigsResponseData.SCHEMAS),
ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
AlterReplicaLogDirsResponse.schemaVersions()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS,

View File

@ -17,6 +17,8 @@
package org.apache.kafka.common.protocol;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.protocol.types.RawTaggedField;
import org.apache.kafka.common.protocol.types.Struct;
@ -77,11 +79,15 @@ public interface Message {
void read(Readable readable, short version);
/**
* Reads this message from the a Struct object. This will overwrite all
* Reads this message from a Struct object. This will overwrite all
* relevant fields with information from the Struct.
*
* @param struct The source struct.
* @param version The version to use.
*
* @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
* If the specified struct can't be processed with the
* specified message version.
*/
void fromStruct(Struct struct, short version);
@ -96,6 +102,46 @@ public interface Message {
*/
Struct toStruct(short version);
/**
* Reads this message from a Jackson JsonNode object. This will overwrite
* all relevant fields with information from the Struct.
*
* For the most part, we expect every JSON object in the input to be the
* correct type. There is one exception: we will deserialize numbers
* represented as strings. If the numeric string begins with 0x, we will
* treat the number as hexadecimal.
*
* Note that we expect to see NullNode objects created for null entries.
* Therefore, please configure your Jackson ObjectMapper with
* setSerializationInclusion({@link JsonInclude.Include#ALWAYS}).
* Other settings may silently omit the nulls, which is not the
* semantic that Kafka RPC uses. (Including a field and setting it to
* null is different than not including the field.)
*
* @param node The source node.
* @param version The version to use.
*
* @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
* If the specified JSON can't be processed with the
* specified message version.
*/
void fromJson(JsonNode node, short version);
/**
* Convert this message to a JsonNode.
*
* Note that 64-bit numbers will be serialized as strings rather than as integers.
* The reason is because JavaScript can't represent numbers above 2**52 accurately.
* Therefore, for maximum interoperability, we represent these numbers as strings.
*
* @param version The version to use.
*
* @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
* If the specified version is too new to be supported
* by this software.
*/
JsonNode toJson(short version);
/**
* Returns a list of tagged fields which this software can't understand.
*

View File

@ -17,6 +17,9 @@
package org.apache.kafka.common.protocol;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.UUID;
@ -51,4 +54,104 @@ public final class MessageUtil {
bld.append("]");
return bld.toString();
}
public static byte jsonNodeToByte(JsonNode node, String about) {
int value = jsonNodeToInt(node, about);
if (value > Byte.MAX_VALUE) {
if (value <= 256) {
// It's more traditional to refer to bytes as unsigned,
// so we support that here.
value -= 128;
} else {
throw new RuntimeException(about + ": value " + value +
" does not fit in an 8-bit signed integer.");
}
}
if (value < Byte.MIN_VALUE) {
throw new RuntimeException(about + ": value " + value +
" does not fit in an 8-bit signed integer.");
}
return (byte) value;
}
public static short jsonNodeToShort(JsonNode node, String about) {
int value = jsonNodeToInt(node, about);
if ((value < Short.MIN_VALUE) || (value > Short.MAX_VALUE)) {
throw new RuntimeException(about + ": value " + value +
" does not fit in a 16-bit signed integer.");
}
return (short) value;
}
public static int jsonNodeToInt(JsonNode node, String about) {
if (node.isInt()) {
return node.asInt();
}
if (node.isTextual()) {
throw new NumberFormatException(about + ": expected an integer or " +
"string type, but got " + node.getNodeType());
}
String text = node.asText();
if (text.startsWith("0x")) {
try {
return Integer.parseInt(text.substring(2), 16);
} catch (NumberFormatException e) {
throw new NumberFormatException(about + ": failed to " +
"parse hexadecimal number: " + e.getMessage());
}
} else {
try {
return Integer.parseInt(text);
} catch (NumberFormatException e) {
throw new NumberFormatException(about + ": failed to " +
"parse number: " + e.getMessage());
}
}
}
public static long jsonNodeToLong(JsonNode node, String about) {
if (node.isLong()) {
return node.asLong();
}
if (node.isTextual()) {
throw new NumberFormatException(about + ": expected an integer or " +
"string type, but got " + node.getNodeType());
}
String text = node.asText();
if (text.startsWith("0x")) {
try {
return Long.parseLong(text.substring(2), 16);
} catch (NumberFormatException e) {
throw new NumberFormatException(about + ": failed to " +
"parse hexadecimal number: " + e.getMessage());
}
} else {
try {
return Long.parseLong(text);
} catch (NumberFormatException e) {
throw new NumberFormatException(about + ": failed to " +
"parse number: " + e.getMessage());
}
}
}
public static byte[] jsonNodeToBinary(JsonNode node, String about) {
if (!node.isBinary()) {
throw new RuntimeException(about + ": expected Base64-encoded binary data.");
}
try {
byte[] value = node.binaryValue();
return value;
} catch (IOException e) {
throw new RuntimeException(about + ": unable to retrieve Base64-encoded binary data", e);
}
}
public static double jsonNodeToDouble(JsonNode node, String about) {
if (!node.isFloatingPointNumber()) {
throw new NumberFormatException(about + ": expected a floating point " +
"type, but got " + node.getNodeType());
}
return node.asDouble();
}
}

View File

@ -147,7 +147,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
case DESCRIBE_CONFIGS:
return new DescribeConfigsResponse(struct);
case ALTER_CONFIGS:
return new AlterConfigsResponse(struct);
return new AlterConfigsResponse(struct, version);
case ALTER_REPLICA_LOG_DIRS:
return new AlterReplicaLogDirsResponse(struct);
case DESCRIBE_LOG_DIRS:

View File

@ -18,59 +18,19 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import java.util.stream.Collectors;
public class AlterConfigsRequest extends AbstractRequest {
private static final String RESOURCES_KEY_NAME = "resources";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
private static final String CONFIG_NAME = "config_name";
private static final String CONFIG_VALUE = "config_value";
private static final Schema CONFIG_ENTRY = new Schema(
new Field(CONFIG_NAME, STRING, "Configuration name"),
new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value"));
private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
new Field(RESOURCE_NAME_KEY_NAME, STRING),
new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY)));
private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema(
new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0),
"An array of resources to update with the provided configs."),
new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema ALTER_CONFIGS_REQUEST_V1 = ALTER_CONFIGS_REQUEST_V0;
public static Schema[] schemaVersions() {
return new Schema[] {ALTER_CONFIGS_REQUEST_V0, ALTER_CONFIGS_REQUEST_V1};
}
public static class Config {
private final Collection<ConfigEntry> entries;
@ -104,98 +64,76 @@ public class AlterConfigsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<AlterConfigsRequest> {
private final Map<ConfigResource, Config> configs;
private final boolean validateOnly;
private final AlterConfigsRequestData data = new AlterConfigsRequestData();
public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
super(ApiKeys.ALTER_CONFIGS);
this.configs = Objects.requireNonNull(configs, "configs");
this.validateOnly = validateOnly;
Objects.requireNonNull(configs, "configs");
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
AlterConfigsRequestData.AlterConfigsResource resource = new AlterConfigsRequestData.AlterConfigsResource()
.setResourceName(entry.getKey().name())
.setResourceType(entry.getKey().type().id());
for (ConfigEntry x : entry.getValue().entries) {
resource.configs().add(new AlterConfigsRequestData.AlterableConfig()
.setName(x.name())
.setValue(x.value()));
}
this.data.resources().add(resource);
}
this.data.setValidateOnly(validateOnly);
}
@Override
public AlterConfigsRequest build(short version) {
return new AlterConfigsRequest(version, configs, validateOnly);
return new AlterConfigsRequest(data, version);
}
}
private final Map<ConfigResource, Config> configs;
private final boolean validateOnly;
private final AlterConfigsRequestData data;
public AlterConfigsRequest(short version, Map<ConfigResource, Config> configs, boolean validateOnly) {
public AlterConfigsRequest(AlterConfigsRequestData data, short version) {
super(ApiKeys.ALTER_CONFIGS, version);
this.configs = Objects.requireNonNull(configs, "configs");
this.validateOnly = validateOnly;
this.data = data;
}
public AlterConfigsRequest(Struct struct, short version) {
super(ApiKeys.ALTER_CONFIGS, version);
validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
configs = new HashMap<>(resourcesArray.length);
for (Object resourcesObj : resourcesArray) {
Struct resourcesStruct = (Struct) resourcesObj;
ConfigResource.Type resourceType = ConfigResource.Type.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME));
String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME);
ConfigResource resource = new ConfigResource(resourceType, resourceName);
Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length);
for (Object configEntriesObj: configEntriesArray) {
Struct configEntriesStruct = (Struct) configEntriesObj;
String configName = configEntriesStruct.getString(CONFIG_NAME);
String configValue = configEntriesStruct.getString(CONFIG_VALUE);
configEntries.add(new ConfigEntry(configName, configValue));
}
Config config = new Config(configEntries);
configs.put(resource, config);
}
this.data = new AlterConfigsRequestData(struct, version);
}
public Map<ConfigResource, Config> configs() {
return configs;
return data.resources().stream().collect(Collectors.toMap(
resource -> new ConfigResource(
ConfigResource.Type.forId(resource.resourceType()),
resource.resourceName()),
resource -> new Config(resource.configs().stream()
.map(entry -> new ConfigEntry(entry.name(), entry.value()))
.collect(Collectors.toList()))));
}
public boolean validateOnly() {
return validateOnly;
return data.validateOnly();
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version()));
struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
List<Struct> resourceStructs = new ArrayList<>(configs.size());
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
ConfigResource resource = entry.getKey();
resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
Config config = entry.getValue();
List<Struct> configEntryStructs = new ArrayList<>(config.entries.size());
for (ConfigEntry configEntry : config.entries) {
Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
configEntriesStruct.set(CONFIG_NAME, configEntry.name);
configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
configEntryStructs.add(configEntriesStruct);
}
resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
resourceStructs.add(resourceStruct);
}
struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
return struct;
return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError error = ApiError.fromThrowable(e);
Map<ConfigResource, ApiError> errors = new HashMap<>(configs.size());
for (ConfigResource resource : configs.keySet())
errors.put(resource, error);
return new AlterConfigsResponse(throttleTimeMs, errors);
AlterConfigsResponseData data = new AlterConfigsResponseData()
.setThrottleTimeMs(throttleTimeMs);
for (AlterConfigsRequestData.AlterConfigsResource resource : this.data.resources()) {
data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse()
.setResourceType(resource.resourceType())
.setResourceName(resource.resourceName())
.setErrorMessage(error.message())
.setErrorCode(error.error().code()));
}
return new AlterConfigsResponse(data);
}
public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {

View File

@ -18,105 +18,53 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import java.util.stream.Collectors;
public class AlterConfigsResponse extends AbstractResponse {
private static final String RESOURCES_KEY_NAME = "resources";
private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
private final AlterConfigsResponseData data;
private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
ERROR_CODE,
ERROR_MESSAGE,
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
new Field(RESOURCE_NAME_KEY_NAME, STRING));
private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0)));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema ALTER_CONFIGS_RESPONSE_V1 = ALTER_CONFIGS_RESPONSE_V0;
public static Schema[] schemaVersions() {
return new Schema[]{ALTER_CONFIGS_RESPONSE_V0, ALTER_CONFIGS_RESPONSE_V1};
public AlterConfigsResponse(AlterConfigsResponseData data) {
this.data = data;
}
private final int throttleTimeMs;
private final Map<ConfigResource, ApiError> errors;
public AlterConfigsResponse(int throttleTimeMs, Map<ConfigResource, ApiError> errors) {
this.throttleTimeMs = throttleTimeMs;
this.errors = Objects.requireNonNull(errors, "errors");
}
public AlterConfigsResponse(Struct struct) {
throttleTimeMs = struct.get(THROTTLE_TIME_MS);
Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
errors = new HashMap<>(resourcesArray.length);
for (Object resourceObj : resourcesArray) {
Struct resourceStruct = (Struct) resourceObj;
ApiError error = new ApiError(resourceStruct);
ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
errors.put(new ConfigResource(resourceType, resourceName), error);
}
public AlterConfigsResponse(Struct struct, short version) {
this.data = new AlterConfigsResponseData(struct, version);
}
public Map<ConfigResource, ApiError> errors() {
return errors;
return data.responses().stream().collect(Collectors.toMap(
response -> new ConfigResource(
ConfigResource.Type.forId(response.resourceType()),
response.resourceName()),
response -> new ApiError(Errors.forCode(response.errorCode()), response.errorMessage())
));
}
@Override
public Map<Errors, Integer> errorCounts() {
return apiErrorCounts(errors);
return apiErrorCounts(errors());
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
return data.throttleTimeMs();
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> resourceStructs = new ArrayList<>(errors.size());
for (Map.Entry<ConfigResource, ApiError> entry : errors.entrySet()) {
Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
ConfigResource resource = entry.getKey();
entry.getValue().write(resourceStruct);
resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
resourceStructs.add(resourceStruct);
}
struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
return struct;
return data.toStruct(version);
}
public static AlterConfigsResponse parse(ByteBuffer buffer, short version) {
return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer));
return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer), version);
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.message;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
@ -638,6 +639,7 @@ public final class MessageTest {
private void testEquivalentMessageRoundTrip(short version, Message message) throws Exception {
testStructRoundTrip(version, message, message);
testByteBufferRoundTrip(version, message, message);
testJsonRoundTrip(version, message, message);
}
private void testByteBufferRoundTrip(short version, Message message, Message expected) throws Exception {
@ -668,6 +670,15 @@ public final class MessageTest {
assertEquals(expected.toString(), message2.toString());
}
private void testJsonRoundTrip(short version, Message message, Message expected) throws Exception {
JsonNode jsonNode = message.toJson(version);
Message message2 = message.getClass().newInstance();
message2.fromJson(jsonNode, version);
assertEquals(expected, message2);
assertEquals(expected.hashCode(), message2.hashCode());
assertEquals(expected.toString(), message2.toString());
}
/**
* Verify that the JSON files support the same message versions as the
* schemas accessible through the ApiKey class.

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
@ -1853,14 +1854,23 @@ public class RequestResponseTest {
configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
new AlterConfigsRequest.Config(Collections.<AlterConfigsRequest.ConfigEntry>emptyList()));
return new AlterConfigsRequest((short) 0, configs, false);
return new AlterConfigsRequest.Builder(configs, false).build((short) 0);
}
private AlterConfigsResponse createAlterConfigsResponse() {
Map<ConfigResource, ApiError> errors = new HashMap<>();
errors.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), ApiError.NONE);
errors.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
return new AlterConfigsResponse(20, errors);
AlterConfigsResponseData data = new AlterConfigsResponseData()
.setThrottleTimeMs(20);
data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse()
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
.setResourceName("0")
.setResourceType(ConfigResource.Type.BROKER.id()));
data.responses().add(new AlterConfigsResponseData.AlterConfigsResourceResponse()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("This request is invalid")
.setResourceName("topic")
.setResourceType(ConfigResource.Type.TOPIC.id()));
return new AlterConfigsResponse(data);
}
private CreatePartitionsRequest createCreatePartitionsRequest() {

View File

@ -421,7 +421,7 @@ public class Values {
if (value instanceof Number) {
// Not already a decimal, so treat it as a double ...
double converted = ((Number) value).doubleValue();
return new BigDecimal(converted);
return BigDecimal.valueOf(converted);
}
if (value instanceof String) {
return new BigDecimal(value.toString()).doubleValue();

View File

@ -48,9 +48,10 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
@ -2404,8 +2405,19 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
val data = new AlterConfigsResponseData()
.setThrottleTimeMs(requestThrottleMs)
(authorizedResult ++ unauthorizedResult).foreach{case (resource, error) =>
data.responses().add(new AlterConfigsResourceResponse()
.setErrorCode(error.error.code)
.setErrorMessage(error.message)
.setResourceName(resource.name)
.setResourceType(resource.`type`.id))
}
new AlterConfigsResponse(data)
}
sendResponseMaybeThrottle(request, responseCallback)
}
def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {

View File

@ -665,7 +665,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica)))))
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get()
// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {

View File

@ -227,6 +227,11 @@ public final class FieldSpec {
return ignorable;
}
@JsonProperty("entityType")
public EntityType entityType() {
return entityType;
}
@JsonProperty("about")
public String about() {
return about;

View File

@ -121,6 +121,11 @@ public interface FieldType {
return Optional.of(8);
}
@Override
public boolean isFloat() {
return true;
}
@Override
public String toString() {
return NAME;
@ -318,6 +323,13 @@ public interface FieldType {
return false;
}
/**
* Returns true if this is a floating point type.
*/
default boolean isFloat() {
return false;
}
/**
* Returns true if this is a struct type.
*/

View File

@ -18,9 +18,26 @@
package org.apache.kafka.message;
/**
* Creates an if statement based on whether or not a particular field is null.
* For versions of a field that are nullable, IsNullCondition creates a null check.
*/
public final class IsNullConditional {
interface ConditionalGenerator {
String generate(String name, boolean negated);
}
private static class PrimitiveConditionalGenerator implements ConditionalGenerator {
final static PrimitiveConditionalGenerator INSTANCE = new PrimitiveConditionalGenerator();
@Override
public String generate(String name, boolean negated) {
if (negated) {
return String.format("%s != null", name);
} else {
return String.format("%s == null", name);
}
}
}
static IsNullConditional forName(String name) {
return new IsNullConditional(name);
}
@ -35,8 +52,9 @@ public final class IsNullConditional {
private Versions nullableVersions = Versions.ALL;
private Versions possibleVersions = Versions.ALL;
private Runnable ifNull = null;
private Runnable ifNotNull = null;
private Runnable ifShouldNotBeNull = null;
private boolean alwaysEmitBlockScope = false;
private ConditionalGenerator conditionalGenerator = PrimitiveConditionalGenerator.INSTANCE;
private IsNullConditional(String name) {
this.name = name;
@ -57,8 +75,8 @@ public final class IsNullConditional {
return this;
}
IsNullConditional ifNotNull(Runnable ifNotNull) {
this.ifNotNull = ifNotNull;
IsNullConditional ifShouldNotBeNull(Runnable ifShouldNotBeNull) {
this.ifShouldNotBeNull = ifShouldNotBeNull;
return this;
}
@ -67,14 +85,19 @@ public final class IsNullConditional {
return this;
}
IsNullConditional conditionalGenerator(ConditionalGenerator conditionalGenerator) {
this.conditionalGenerator = conditionalGenerator;
return this;
}
void generate(CodeBuffer buffer) {
if (nullableVersions.intersect(possibleVersions).empty()) {
if (ifNotNull != null) {
if (ifShouldNotBeNull != null) {
if (alwaysEmitBlockScope) {
buffer.printf("{%n");
buffer.incrementIndent();
}
ifNotNull.run();
ifShouldNotBeNull.run();
if (alwaysEmitBlockScope) {
buffer.decrementIndent();
buffer.printf("}%n");
@ -82,21 +105,21 @@ public final class IsNullConditional {
}
} else {
if (ifNull != null) {
buffer.printf("if (%s == null) {%n", name);
buffer.printf("if (%s) {%n", conditionalGenerator.generate(name, false));
buffer.incrementIndent();
ifNull.run();
buffer.decrementIndent();
if (ifNotNull != null) {
if (ifShouldNotBeNull != null) {
buffer.printf("} else {%n");
buffer.incrementIndent();
ifNotNull.run();
ifShouldNotBeNull.run();
buffer.decrementIndent();
}
buffer.printf("}%n");
} else if (ifNotNull != null) {
buffer.printf("if (%s != null) {%n", name);
} else if (ifShouldNotBeNull != null) {
buffer.printf("if (%s) {%n", conditionalGenerator.generate(name, true));
buffer.incrementIndent();
ifNotNull.run();
ifShouldNotBeNull.run();
buffer.decrementIndent();
buffer.printf("}%n");
}

View File

@ -98,6 +98,10 @@ public final class MessageDataGenerator {
buffer.printf("%n");
generateClassToStruct(className, struct, parentVersions);
buffer.printf("%n");
generateClassFromJson(className, struct, parentVersions);
buffer.printf("%n");
generateClassToJson(className, struct, parentVersions);
buffer.printf("%n");
generateClassSize(className, struct, parentVersions);
buffer.printf("%n");
generateClassEquals(className, struct, isSetElement);
@ -392,39 +396,46 @@ public final class MessageDataGenerator {
buffer.printf("public %s(Readable _readable, short _version) {%n", className);
buffer.incrementIndent();
buffer.printf("read(_readable, _version);%n");
if (isSetElement) {
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS);
buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
}
generateConstructorEpilogue(isSetElement);
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%n");
headerGenerator.addImport(MessageGenerator.STRUCT_CLASS);
buffer.printf("public %s(Struct struct, short _version) {%n", className);
buffer.printf("public %s(Struct _struct, short _version) {%n", className);
buffer.incrementIndent();
buffer.printf("fromStruct(struct, _version);%n");
if (isSetElement) {
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS);
buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
}
buffer.printf("fromStruct(_struct, _version);%n");
generateConstructorEpilogue(isSetElement);
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%n");
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("public %s(JsonNode _node, short _version) {%n", className);
buffer.incrementIndent();
buffer.printf("fromJson(_node, _version);%n");
generateConstructorEpilogue(isSetElement);
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%n");
buffer.printf("public %s() {%n", className);
buffer.incrementIndent();
for (FieldSpec field : struct.fields()) {
buffer.printf("this.%s = %s;%n",
field.camelCaseName(), fieldDefault(field));
}
generateConstructorEpilogue(isSetElement);
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateConstructorEpilogue(boolean isSetElement) {
if (isSetElement) {
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_COLLECTION_CLASS);
buffer.printf("this.prev = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
buffer.printf("this.next = ImplicitLinkedHashCollection.INVALID_INDEX;%n");
}
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateShortAccessor(String name, short val) {
@ -782,6 +793,315 @@ public final class MessageDataGenerator {
buffer.printf("}%n");
}
private void generateClassFromJson(String className, StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("@Override%n");
buffer.printf("public void fromJson(JsonNode _node, short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't read " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
Versions curVersions = parentVersions.intersect(struct.versions());
for (FieldSpec field : struct.fields()) {
String sourceVariable = String.format("_%sNode", field.camelCaseName());
buffer.printf("JsonNode %s = _node.get(\"%s\");%n",
sourceVariable,
field.camelCaseName());
buffer.printf("if (%s == null) {%n", sourceVariable);
buffer.incrementIndent();
Versions mandatoryVersions = field.versions().subtract(field.taggedVersions());
VersionConditional.forVersions(mandatoryVersions, curVersions).
ifMember(__ -> {
buffer.printf("throw new RuntimeException(\"%s: unable to locate " +
"field \'%s\', which is mandatory in version \" + _version);%n",
className, field.camelCaseName());
}).
ifNotMember(__ -> {
buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field));
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), curVersions).
ifMember(presentVersions -> {
generateTargetFromJson(new Target(field,
sourceVariable,
className,
input -> String.format("this.%s = %s", field.camelCaseName(), input)),
curVersions);
}).ifNotMember(__ -> {
buffer.printf("throw new RuntimeException(\"%s: field \'%s\' is not " +
"supported in version \" + _version);%n",
className, field.camelCaseName());
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("}%n");
}
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateTargetFromJson(Target target, Versions curVersions) {
if (target.field().type() instanceof FieldType.BoolFieldType) {
buffer.printf("if (!%s.isBoolean()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected Boolean type, " +
"but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(
target.sourceVariable() + ".asBoolean()"));
} else if (target.field().type() instanceof FieldType.Int8FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToByte(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int16FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToShort(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToInt(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int64FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToLong(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.UUIDFieldType) {
buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a JSON string " +
"type, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
headerGenerator.addImport(MessageGenerator.UUID_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(String.format(
"UUID.fromString(%s.asText())", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Float64FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToDouble(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else {
// Handle the variable length types. All of them are potentially
// nullable, so handle that here.
IsNullConditional.forName(target.sourceVariable()).
nullableVersions(target.field().nullableVersions()).
possibleVersions(curVersions).
conditionalGenerator((name, negated) ->
String.format("%s%s.isNull()", negated ? "!" : "", name)).
ifNull(() -> {
buffer.printf("%s;%n", target.assignmentStatement("null"));
}).
ifShouldNotBeNull(() -> {
generateVariableLengthTargetFromJson(target, curVersions);
}).
generate(buffer);
}
}
private void generateVariableLengthTargetFromJson(Target target, Versions curVersions) {
if (target.field().type().isString()) {
buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a string " +
"type, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(
String.format("%s.asText()", target.sourceVariable())));
} else if (target.field().type().isBytes()) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
if (target.field().zeroCopy()) {
headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\"))",
target.sourceVariable(), target.humanReadableName())));
} else {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
}
} else if (target.field().type().isArray()) {
buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a JSON " +
"array, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new %s()", fieldConcreteJavaType(target.field()))));
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("for (JsonNode _element : %s) {%n", target.sourceVariable());
buffer.incrementIndent();
generateTargetFromJson(target.arrayElementTarget(
input -> String.format("%s.add(%s)", target.field().camelCaseName(), input)),
curVersions);
buffer.decrementIndent();
buffer.printf("}%n");
} else if (target.field().type().isStruct()) {
buffer.printf("%s;%n", target.assignmentStatement(String.format("new %s(%s, _version)",
target.field().type().toString(),
target.sourceVariable())));
} else {
throw new RuntimeException("Unexpected type " + target.field().type());
}
}
private void generateClassToJson(String className, StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("@Override%n");
buffer.printf("public JsonNode toJson(short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't write " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
Versions curVersions = parentVersions.intersect(struct.versions());
headerGenerator.addImport(MessageGenerator.OBJECT_NODE_CLASS);
headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
buffer.printf("ObjectNode _node = new ObjectNode(JsonNodeFactory.instance);%n");
for (FieldSpec field : struct.fields()) {
Target target = new Target(field,
String.format("this.%s", field.camelCaseName()),
field.camelCaseName(),
input -> String.format("_node.set(\"%s\", %s)", field.camelCaseName(), input));
VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
ifMember(presentVersions -> {
VersionConditional.forVersions(field.taggedVersions(), presentVersions).
ifMember(presentAndTaggedVersions -> {
generateNonDefaultValueCheck(field, field.nullableVersions());
buffer.incrementIndent();
if (field.defaultString().equals("null")) {
// If the default was null, and we already checked that this field was not
// the default, we can omit further null checks.
generateTargetToJson(target.nonNullableCopy(), presentAndTaggedVersions);
} else {
generateTargetToJson(target, presentAndTaggedVersions);
}
buffer.decrementIndent();
buffer.printf("}%n");
}).
ifNotMember(presentAndNotTaggedVersions -> {
generateTargetToJson(target, presentAndNotTaggedVersions);
}).
generate(buffer);
});
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
generateNonIgnorableFieldCheck(field);
});
}
cond.generate(buffer);
}
buffer.printf("return _node;%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateTargetToJson(Target target, Versions versions) {
if (target.field().type() instanceof FieldType.BoolFieldType) {
headerGenerator.addImport(MessageGenerator.BOOLEAN_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("BooleanNode.valueOf(%s)", target.sourceVariable())));
} else if ((target.field().type() instanceof FieldType.Int8FieldType) ||
(target.field().type() instanceof FieldType.Int16FieldType)) {
headerGenerator.addImport(MessageGenerator.SHORT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new ShortNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new IntNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Int64FieldType) {
headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new LongNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.UUIDFieldType) {
headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new TextNode(%s.toString())", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Float64FieldType) {
headerGenerator.addImport(MessageGenerator.DOUBLE_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new DoubleNode(%s)", target.sourceVariable())));
} else {
// Handle the variable length types. All of them are potentially
// nullable, so handle that here.
IsNullConditional.forName(target.sourceVariable()).
nullableVersions(target.field().nullableVersions()).
possibleVersions(versions).
conditionalGenerator((name, negated) ->
String.format("%s %s= null", name, negated ? "!" : "=")).
ifNull(() -> {
headerGenerator.addImport(MessageGenerator.NULL_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement("NullNode.instance"));
}).
ifShouldNotBeNull(() -> {
generateVariableLengthTargetToJson(target, versions);
}).
generate(buffer);
}
}
private void generateVariableLengthTargetToJson(Target target, Versions versions) {
if (target.field().type().isString()) {
headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new TextNode(%s)", target.sourceVariable())));
} else if (target.field().type().isBytes()) {
headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
if (target.field().zeroCopy()) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new BinaryNode(MessageUtil.byteBufferToArray(%s))",
target.sourceVariable())));
} else {
headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))",
target.sourceVariable(), target.sourceVariable())));
}
} else if (target.field().type().isArray()) {
headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS);
headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
FieldType.ArrayType arrayType = (FieldType.ArrayType) target.field().type();
FieldType elementType = arrayType.elementType();
String arrayInstanceName = String.format("_%sArray", target.field().camelCaseName());
buffer.printf("ArrayNode %s = new ArrayNode(JsonNodeFactory.instance);%n", arrayInstanceName);
buffer.printf("for (%s _element : %s) {%n",
getBoxedJavaType(elementType), target.sourceVariable());
buffer.incrementIndent();
generateTargetToJson(target.arrayElementTarget(
input -> String.format("%s.add(%s)", arrayInstanceName, input)),
versions);
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(arrayInstanceName));
} else if (target.field().type().isStruct()) {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("%s.toJson(_version)", target.sourceVariable())));
} else {
throw new RuntimeException("unknown type " + target.field().type());
}
}
private void generateArrayFromStruct(FieldSpec field, Versions versions) {
IsNullConditional.forName("_nestedObjects").
@ -790,7 +1110,7 @@ public final class MessageDataGenerator {
ifNull(() -> {
buffer.printf("this.%s = null;%n", field.camelCaseName());
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
FieldType elementType = arrayType.elementType();
buffer.printf("this.%s = new %s(_nestedObjects.length);%n",
@ -866,19 +1186,15 @@ public final class MessageDataGenerator {
}
}
private void maybeGenerateNonIgnorableFieldCheck(FieldSpec field, VersionConditional cond) {
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
generateNonDefaultValueCheck(field, field.nullableVersions());
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(" +
"\"Attempted to write a non-default %s at version \" + _version);%n",
field.camelCaseName());
buffer.decrementIndent();
buffer.printf("}%n");
});
}
private void generateNonIgnorableFieldCheck(FieldSpec field) {
generateNonDefaultValueCheck(field, field.nullableVersions());
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(" +
"\"Attempted to write a non-default %s at version \" + _version);%n",
field.camelCaseName());
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateClassWriter(String className, StructSpec struct,
@ -888,7 +1204,7 @@ public final class MessageDataGenerator {
buffer.printf("@Override%n");
buffer.printf("public void write(Writable _writable, ObjectSerializationCache _cache, short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(parentVersions, struct.versions()).
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
@ -946,8 +1262,11 @@ public final class MessageDataGenerator {
}).
generate(buffer);
});
maybeGenerateNonIgnorableFieldCheck(field, cond);
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
generateNonIgnorableFieldCheck(field);
});
}
cond.generate(buffer);
}
headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS);
@ -972,7 +1291,7 @@ public final class MessageDataGenerator {
nullableVersions(field.nullableVersions()).
possibleVersions(presentAndTaggedVersions).
alwaysEmitBlockScope(true).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
if (!field.defaultString().equals("null")) {
generateNonDefaultValueCheck(field, Versions.NONE);
buffer.incrementIndent();
@ -1100,7 +1419,7 @@ public final class MessageDataGenerator {
}).
generate(buffer);
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
final String lengthExpression;
if (type.isString()) {
buffer.printf("byte[] _stringBytes = _cache.getSerializedValue(%s);%n",
@ -1262,8 +1581,11 @@ public final class MessageDataGenerator {
}).
generate(buffer);
});
maybeGenerateNonIgnorableFieldCheck(field, cond);
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
generateNonIgnorableFieldCheck(field);
});
}
cond.generate(buffer);
}
VersionConditional.forVersions(messageFlexibleVersions, curVersions).
@ -1306,7 +1628,7 @@ public final class MessageDataGenerator {
ifNull(() -> {
buffer.printf("struct.set(\"%s\", null);%n", field.snakeCaseName());
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
generateFieldToObjectArray(field);
buffer.printf("struct.set(\"%s\", (Object[]) _nestedObjects);%n",
field.snakeCaseName());
@ -1347,7 +1669,7 @@ public final class MessageDataGenerator {
ifNull(() -> {
buffer.printf("_taggedFields.put(%d, null);%n", field.tag().get());
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
generateFieldToObjectArray(field);
buffer.printf("_taggedFields.put(%d, _nestedObjects);%n", field.tag().get());
}).generate(buffer);
@ -1535,7 +1857,7 @@ public final class MessageDataGenerator {
generate(buffer);
}
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
if (tagged) {
if (!field.defaultString().equals("null")) {
generateNonDefaultValueCheck(field, Versions.NONE);

View File

@ -112,6 +112,30 @@ public final class MessageGenerator {
static final String MAP_ENTRY_CLASS = "java.util.Map.Entry";
static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode";
static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode";
static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory";
static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode";
static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode";
static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode";
static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode";
static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode";
static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode";
static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode";
static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode";
static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";
/**
* The Jackson serializer we use for JSON objects.
*/

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message;
import java.util.Collections;
import java.util.function.Function;
public final class Target {
private final FieldSpec field;
private final String sourceVariable;
private final String humanReadableName;
private final Function<String, String> assignmentStatementGenerator;
Target(FieldSpec field, String sourceVariable, String humanReadableName,
Function<String, String> assignmentStatementGenerator) {
this.field = field;
this.sourceVariable = sourceVariable;
this.humanReadableName = humanReadableName;
this.assignmentStatementGenerator = assignmentStatementGenerator;
}
public String assignmentStatement(String rightHandSide) {
return assignmentStatementGenerator.apply(rightHandSide);
}
public Target nonNullableCopy() {
FieldSpec nonNullableField = new FieldSpec(field.name(),
field.versionsString(),
field.fields(),
field.typeString(),
field.mapKey(),
Versions.NONE.toString(),
field.defaultString(),
field.ignorable(),
field.entityType(),
field.about(),
field.taggedVersionsString(),
field.flexibleVersionsString(),
field.tagInteger(),
field.zeroCopy());
return new Target(nonNullableField, sourceVariable, humanReadableName, assignmentStatementGenerator);
}
public Target arrayElementTarget(Function<String, String> assignmentStatementGenerator) {
if (!field.type().isArray()) {
throw new RuntimeException("Field " + field + " is not an array.");
}
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
FieldSpec elementField = new FieldSpec(field.name() + "Element",
field.versions().toString(),
Collections.emptyList(),
arrayType.elementType().toString(),
false,
Versions.NONE.toString(),
"",
false,
EntityType.UNKNOWN,
"",
Versions.NONE.toString(),
field.flexibleVersionsString(),
null,
field.zeroCopy());
return new Target(elementField, "_element", humanReadableName + " element",
assignmentStatementGenerator);
}
public FieldSpec field() {
return field;
}
public String sourceVariable() {
return sourceVariable;
}
public String humanReadableName() {
return humanReadableName;
}
}

View File

@ -52,7 +52,7 @@ public class IsNullConditionalTest {
ifNull(() -> {
buffer.printf("System.out.println(\"null\");%n");
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
buffer.printf("System.out.println(\"not null\");%n");
}).
generate(buffer);
@ -71,7 +71,7 @@ public class IsNullConditionalTest {
forName("foobar").
nullableVersions(Versions.parse("0+", null)).
possibleVersions(Versions.parse("2+", null)).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
buffer.printf("System.out.println(\"not null\");%n");
}).
generate(buffer);
@ -91,7 +91,7 @@ public class IsNullConditionalTest {
ifNull(() -> {
buffer.printf("System.out.println(\"null\");%n");
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
buffer.printf("System.out.println(\"not null\");%n");
}).
generate(buffer);
@ -109,7 +109,7 @@ public class IsNullConditionalTest {
ifNull(() -> {
buffer.printf("System.out.println(\"null\");%n");
}).
ifNotNull(() -> {
ifShouldNotBeNull(() -> {
buffer.printf("System.out.println(\"not null\");%n");
}).
alwaysEmitBlockScope(true).

View File

@ -32,15 +32,18 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
@ -51,7 +54,6 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.junit.Assert.assertFalse;
@ -60,9 +62,21 @@ import static org.junit.Assert.assertFalse;
/**
* Test the unclean shutdown behavior around state store cleanup.
*/
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}
@Parameterized.Parameter
public String eosConfig;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@ -82,8 +96,6 @@ public class EOSUncleanShutdownIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}
@ -91,6 +103,7 @@ public class EOSUncleanShutdownIntegrationTest {
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);

View File

@ -48,14 +48,18 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
private static final int NUM_BROKERS = 1;
@ -70,6 +74,17 @@ public class GlobalKTableEOSIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}
@Parameterized.Parameter
public String eosConfig;
private static volatile AtomicInteger testNo = new AtomicInteger(0);
private final MockTime mockTime = CLUSTER.time;
private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
@ -97,7 +112,7 @@ public class GlobalKTableEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
.withKeySerde(Serdes.Long())
@ -319,15 +334,9 @@ public class GlobalKTableEOSIntegrationTest {
}
private void produceInitialGlobalTableValues() throws Exception {
produceInitialGlobalTableValues(true);
}
private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
final Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
}
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
@ -342,7 +351,7 @@ public class GlobalKTableEOSIntegrationTest {
StringSerializer.class,
properties),
mockTime,
enableTransactions);
true);
}
private void produceGlobalTableValues() throws Exception {

View File

@ -130,9 +130,19 @@ public class KTableSourceTopicRestartIntegrationTest {
}
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
}
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosBetaEnabled() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
}
private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
try {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();

View File

@ -40,8 +40,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@ -51,8 +51,6 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
@ -77,20 +75,19 @@ public class ResetPartitionTimeIntegrationTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final int DEFAULT_TIMEOUT = 100;
private final boolean eosEnabled;
private static long lastRecordedTimestamp = -2L;
@Parameters(name = "{index}: eosEnabled={0}")
public static Collection<Object[]> parameters() {
return asList(
new Object[] {false},
new Object[] {true}
);
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}
public ResetPartitionTimeIntegrationTest(final boolean eosEnabled) {
this.eosEnabled = eosEnabled;
}
@Parameterized.Parameter
public String processingGuarantee;
@Test
public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
@ -112,7 +109,7 @@ public class ResetPartitionTimeIntegrationTest {
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT));
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
KafkaStreams kafkaStreams = getStartedStreams(streamsConfig, builder, true);

View File

@ -101,8 +101,9 @@ public class RocksDBMetricsIntegrationTest {
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.AT_LEAST_ONCE}
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

View File

@ -49,10 +49,10 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -69,8 +69,6 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
@ -94,19 +92,18 @@ public class SuppressionDurabilityIntegrationTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final int COMMIT_INTERVAL = 100;
private final boolean eosEnabled;
@Parameters(name = "{index}: eosEnabled={0}")
public static Collection<Object[]> parameters() {
return asList(
new Object[] {false},
new Object[] {true}
);
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}
public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
this.eosEnabled = eosEnabled;
}
@Parameterized.Parameter
public String processingGuaranteee;
@Test
public void shouldRecoverBufferAfterShutdown() {
@ -153,7 +150,7 @@ public class SuppressionDurabilityIntegrationTest {
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));

View File

@ -508,13 +508,16 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
TestUtils.retryOnExceptionWithTimeout(
() -> EasyMock.verify(mockConsumer)
);
thread.shutdown();
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
EasyMock.verify(mockConsumer);
}
private static class EasyMockConsumerClientSupplier extends MockClientSupplier {

View File

@ -70,7 +70,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
@ -361,7 +360,7 @@ public class StreamThreadStateStoreProviderTest {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Task.TaskType.ACTIVE,
EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
StreamThread.eosEnabled(streamsConfig),
logContext,
stateDirectory,
new StoreChangelogReader(