mirror of https://github.com/apache/kafka.git
MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions (#15469)
Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
93238ae312
commit
11ad5e8bca
|
@ -99,7 +99,7 @@
|
|||
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
|
||||
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
|
||||
|
||||
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
|
||||
files="CoordinatorClient.java"/>
|
||||
|
@ -143,7 +143,7 @@
|
|||
<suppress checks="ClassFanOutComplexity"
|
||||
files="Worker(|Test).java"/>
|
||||
<suppress checks="MethodLength"
|
||||
files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|Values|IncrementalCooperativeAssignor).java"/>
|
||||
files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|IncrementalCooperativeAssignor).java"/>
|
||||
<suppress checks="ParameterNumber"
|
||||
files="Worker(SinkTask|SourceTask|Coordinator)?.java"/>
|
||||
<suppress checks="ParameterNumber"
|
||||
|
@ -160,10 +160,10 @@
|
|||
<suppress checks="CyclomaticComplexity"
|
||||
files="(FileStreamSourceTask|DistributedHerder|KafkaConfigBackingStore).java"/>
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="(JsonConverter|Values|ConnectHeaders).java"/>
|
||||
files="(JsonConverter|ConnectHeaders).java"/>
|
||||
|
||||
<suppress checks="JavaNCSS"
|
||||
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
|
||||
files="(KafkaConfigBackingStore|ConnectMetricsRegistry).java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -81,6 +81,20 @@ public class ValuesTest {
|
|||
INT_LIST.add(-987654321);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseNullString() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString(null);
|
||||
assertNull(schemaAndValue.schema());
|
||||
assertNull(schemaAndValue.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseEmptyString() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString("");
|
||||
assertEquals(Schema.STRING_SCHEMA, schemaAndValue.schema());
|
||||
assertEquals("", schemaAndValue.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(5)
|
||||
public void shouldNotEncounterInfiniteLoop() {
|
||||
|
@ -246,6 +260,20 @@ public class ValuesTest {
|
|||
|
||||
@Test
|
||||
public void shouldConvertNullValue() {
|
||||
assertRoundTrip(Schema.INT8_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_INT8_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.INT16_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_INT16_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_INT32_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_INT64_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.FLOAT32_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.FLOAT64_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_BOOLEAN_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, Schema.STRING_SCHEMA, null);
|
||||
}
|
||||
|
@ -253,14 +281,22 @@ public class ValuesTest {
|
|||
@Test
|
||||
public void shouldConvertBooleanValues() {
|
||||
assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.BOOLEAN_SCHEMA, Boolean.FALSE);
|
||||
assertShortCircuit(Schema.BOOLEAN_SCHEMA, Boolean.FALSE);
|
||||
SchemaAndValue resultFalse = roundTrip(Schema.BOOLEAN_SCHEMA, "false");
|
||||
assertEquals(Schema.BOOLEAN_SCHEMA, resultFalse.schema());
|
||||
assertEquals(Boolean.FALSE, resultFalse.value());
|
||||
resultFalse = roundTrip(Schema.BOOLEAN_SCHEMA, "0");
|
||||
assertEquals(Schema.BOOLEAN_SCHEMA, resultFalse.schema());
|
||||
assertEquals(Boolean.FALSE, resultFalse.value());
|
||||
|
||||
assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.BOOLEAN_SCHEMA, Boolean.TRUE);
|
||||
assertShortCircuit(Schema.BOOLEAN_SCHEMA, Boolean.TRUE);
|
||||
SchemaAndValue resultTrue = roundTrip(Schema.BOOLEAN_SCHEMA, "true");
|
||||
assertEquals(Schema.BOOLEAN_SCHEMA, resultTrue.schema());
|
||||
assertEquals(Boolean.TRUE, resultTrue.value());
|
||||
resultTrue = roundTrip(Schema.BOOLEAN_SCHEMA, "1");
|
||||
assertEquals(Schema.BOOLEAN_SCHEMA, resultTrue.schema());
|
||||
assertEquals(Boolean.TRUE, resultTrue.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -268,6 +304,38 @@ public class ValuesTest {
|
|||
assertThrows(DataException.class, () -> Values.convertToBoolean(Schema.STRING_SCHEMA, "\"green\""));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertInt8() {
|
||||
assertRoundTrip(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA, (byte) 0);
|
||||
assertRoundTrip(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA, (byte) 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertInt64() {
|
||||
assertRoundTrip(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA, (long) 1);
|
||||
assertShortCircuit(Schema.INT64_SCHEMA, (long) 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertFloat32() {
|
||||
assertRoundTrip(Schema.FLOAT32_SCHEMA, Schema.FLOAT32_SCHEMA, (float) 1);
|
||||
assertShortCircuit(Schema.FLOAT32_SCHEMA, (float) 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertFloat64() {
|
||||
assertRoundTrip(Schema.FLOAT64_SCHEMA, Schema.FLOAT64_SCHEMA, (double) 1);
|
||||
assertShortCircuit(Schema.FLOAT64_SCHEMA, (double) 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertEmptyStruct() {
|
||||
Struct struct = new Struct(SchemaBuilder.struct().build());
|
||||
assertThrows(DataException.class, () -> Values.convertToStruct(struct.schema(), null));
|
||||
assertThrows(DataException.class, () -> Values.convertToStruct(struct.schema(), ""));
|
||||
Values.convertToStruct(struct.schema(), struct);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertSimpleString() {
|
||||
assertRoundTrip(Schema.STRING_SCHEMA, "simple");
|
||||
|
@ -361,7 +429,27 @@ public class ValuesTest {
|
|||
assertEquals(3, list.size());
|
||||
assertEquals(1, ((Number) list.get(0)).intValue());
|
||||
assertEquals(2, ((Number) list.get(1)).intValue());
|
||||
assertEquals(thirdValue, ((Number) list.get(2)).intValue());
|
||||
assertEquals(thirdValue, list.get(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertIntegralTypesToFloat() {
|
||||
float thirdValue = Float.MAX_VALUE;
|
||||
List<?> list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue + "]");
|
||||
assertEquals(3, list.size());
|
||||
assertEquals(1, ((Number) list.get(0)).intValue());
|
||||
assertEquals(2, ((Number) list.get(1)).intValue());
|
||||
assertEquals(thirdValue, list.get(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertIntegralTypesToDouble() {
|
||||
double thirdValue = Double.MAX_VALUE;
|
||||
List<?> list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue + "]");
|
||||
assertEquals(3, list.size());
|
||||
assertEquals(1, ((Number) list.get(0)).intValue());
|
||||
assertEquals(2, ((Number) list.get(1)).intValue());
|
||||
assertEquals(thirdValue, list.get(2));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -434,6 +522,34 @@ public class ValuesTest {
|
|||
assertEquals(expected, list);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseNestedArray() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString("[[]]");
|
||||
assertEquals(Type.ARRAY, schemaAndValue.schema().type());
|
||||
assertEquals(Type.ARRAY, schemaAndValue.schema().valueSchema().type());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseArrayContainingMap() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString("[{}]");
|
||||
assertEquals(Type.ARRAY, schemaAndValue.schema().type());
|
||||
assertEquals(Type.MAP, schemaAndValue.schema().valueSchema().type());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseNestedMap() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString("{\"a\":{}}");
|
||||
assertEquals(Type.MAP, schemaAndValue.schema().type());
|
||||
assertEquals(Type.MAP, schemaAndValue.schema().valueSchema().type());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseMapContainingArray() {
|
||||
SchemaAndValue schemaAndValue = Values.parseString("{\"a\":[]}");
|
||||
assertEquals(Type.MAP, schemaAndValue.schema().type());
|
||||
assertEquals(Type.ARRAY, schemaAndValue.schema().valueSchema().type());
|
||||
}
|
||||
|
||||
/**
|
||||
* We can't infer or successfully parse into a different type, so this returns the same string.
|
||||
*/
|
||||
|
@ -445,6 +561,22 @@ public class ValuesTest {
|
|||
assertEquals(str, result.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseStringListWithNullLastAsString() {
|
||||
String str = "[1, null]";
|
||||
SchemaAndValue result = Values.parseString(str);
|
||||
assertEquals(Type.STRING, result.schema().type());
|
||||
assertEquals(str, result.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseStringListWithNullFirstAsString() {
|
||||
String str = "[null, 1]";
|
||||
SchemaAndValue result = Values.parseString(str);
|
||||
assertEquals(Type.STRING, result.schema().type());
|
||||
assertEquals(str, result.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseTimestampStringAsTimestamp() throws Exception {
|
||||
String str = "2019-08-23T14:34:54.346Z";
|
||||
|
@ -585,6 +717,13 @@ public class ValuesTest {
|
|||
assertEquals(Collections.singletonMap(keyStr, expected), result.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailToConvertNullTime() {
|
||||
assertThrows(DataException.class, () -> Values.convertToTime(null, null));
|
||||
assertThrows(DataException.class, () -> Values.convertToDate(null, null));
|
||||
assertThrows(DataException.class, () -> Values.convertToTimestamp(null, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* This is technically invalid JSON, and we don't want to simply ignore the blank elements.
|
||||
*/
|
||||
|
@ -802,6 +941,51 @@ public class ValuesTest {
|
|||
assertEquals(value, Values.convertToDecimal(null, buffer, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailToConvertNullToDecimal() {
|
||||
assertThrows(DataException.class, () -> Values.convertToDecimal(null, null, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferByteSchema() {
|
||||
byte[] bytes = new byte[1];
|
||||
Schema byteSchema = Values.inferSchema(bytes);
|
||||
assertEquals(Schema.BYTES_SCHEMA, byteSchema);
|
||||
Schema byteBufferSchema = Values.inferSchema(ByteBuffer.wrap(bytes));
|
||||
assertEquals(Schema.BYTES_SCHEMA, byteBufferSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferStructSchema() {
|
||||
Struct struct = new Struct(SchemaBuilder.struct().build());
|
||||
Schema structSchema = Values.inferSchema(struct);
|
||||
assertEquals(struct.schema(), structSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferNoSchemaForEmptyList() {
|
||||
Schema listSchema = Values.inferSchema(Collections.emptyList());
|
||||
assertNull(listSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferNoSchemaForListContainingObject() {
|
||||
Schema listSchema = Values.inferSchema(Collections.singletonList(new Object()));
|
||||
assertNull(listSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferNoSchemaForEmptyMap() {
|
||||
Schema listSchema = Values.inferSchema(Collections.emptyMap());
|
||||
assertNull(listSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldInferNoSchemaForMapContainingObject() {
|
||||
Schema listSchema = Values.inferSchema(Collections.singletonMap(new Object(), new Object()));
|
||||
assertNull(listSchema);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test parsing distinct number-like types (strings containing numbers, and logical Decimals) in the same list
|
||||
* The parser does not convert Numbers to Decimals, or Strings containing numbers to Numbers automatically.
|
||||
|
@ -819,6 +1003,17 @@ public class ValuesTest {
|
|||
assertEquals(expected, schemaAndValue.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseArrayOfOnlyDecimals() {
|
||||
List<Object> decimals = Arrays.asList(BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE),
|
||||
BigDecimal.valueOf(Long.MIN_VALUE).subtract(BigDecimal.ONE));
|
||||
SchemaAndValue schemaAndValue = Values.parseString(decimals.toString());
|
||||
Schema schema = schemaAndValue.schema();
|
||||
assertEquals(Type.ARRAY, schema.type());
|
||||
assertEquals(Decimal.schema(0), schema.valueSchema());
|
||||
assertEquals(decimals, schemaAndValue.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canConsume() {
|
||||
}
|
||||
|
@ -949,6 +1144,16 @@ public class ValuesTest {
|
|||
assertEquals(value, (Double) schemaAndValue.value(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseFractionalPartsAsIntegerWhenNoFractionalPart() {
|
||||
assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 1), Values.parseString("1.0"));
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 1.1f), Values.parseString("1.1"));
|
||||
assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 300), Values.parseString("300.0"));
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 300.01f), Values.parseString("300.01"));
|
||||
assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 66000), Values.parseString("66000.0"));
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 66000.0008f), Values.parseString("66000.0008"));
|
||||
}
|
||||
|
||||
protected void assertParsed(String input) {
|
||||
assertParsed(input, input);
|
||||
}
|
||||
|
@ -1011,47 +1216,48 @@ public class ValuesTest {
|
|||
desiredSchema = Values.inferSchema(input);
|
||||
assertNotNull(desiredSchema);
|
||||
}
|
||||
return convertTo(desiredSchema, serialized);
|
||||
}
|
||||
|
||||
protected SchemaAndValue convertTo(Schema desiredSchema, Object value) {
|
||||
Object newValue = null;
|
||||
Schema newSchema = null;
|
||||
switch (desiredSchema.type()) {
|
||||
case STRING:
|
||||
newValue = Values.convertToString(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToString(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case INT8:
|
||||
newValue = Values.convertToByte(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToByte(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case INT16:
|
||||
newValue = Values.convertToShort(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToShort(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case INT32:
|
||||
newValue = Values.convertToInteger(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToInteger(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case INT64:
|
||||
newValue = Values.convertToLong(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToLong(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case FLOAT32:
|
||||
newValue = Values.convertToFloat(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToFloat(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case FLOAT64:
|
||||
newValue = Values.convertToDouble(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToDouble(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case BOOLEAN:
|
||||
newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case ARRAY:
|
||||
newValue = Values.convertToList(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToList(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case MAP:
|
||||
newValue = Values.convertToMap(Schema.STRING_SCHEMA, serialized);
|
||||
newValue = Values.convertToMap(Schema.STRING_SCHEMA, value);
|
||||
break;
|
||||
case STRUCT:
|
||||
newValue = Values.convertToStruct(Schema.STRING_SCHEMA, serialized);
|
||||
break;
|
||||
case BYTES:
|
||||
fail("unexpected schema type");
|
||||
break;
|
||||
}
|
||||
newSchema = Values.inferSchema(newValue);
|
||||
Schema newSchema = Values.inferSchema(newValue);
|
||||
return new SchemaAndValue(newSchema, newValue);
|
||||
}
|
||||
|
||||
|
@ -1075,4 +1281,16 @@ public class ValuesTest {
|
|||
assertEquals(result, result2);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertShortCircuit(Schema schema, Object value) {
|
||||
SchemaAndValue result = convertTo(schema, value);
|
||||
|
||||
if (value == null) {
|
||||
assertNull(result.schema());
|
||||
assertNull(result.value());
|
||||
} else {
|
||||
assertEquals(value, result.value());
|
||||
assertEquals(schema, result.schema());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -298,6 +298,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
|
|||
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
|
||||
</Match>
|
||||
|
||||
<Match>
|
||||
<!-- Converting to an optional boolean intentionally returns null on null input. -->
|
||||
<Class name="org.apache.kafka.connect.data.Values"/>
|
||||
<Method name="convertToBoolean"/>
|
||||
<Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
|
||||
</Match>
|
||||
|
||||
<Match>
|
||||
<!-- Suppress some minor warnings about machine-generated code for benchmarking. -->
|
||||
<Package name="~org\.apache\.kafka\.jmh\..*\.jmh_generated"/>
|
||||
|
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.jmh.connect;
|
||||
|
||||
import org.apache.kafka.connect.data.Date;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.data.Time;
|
||||
import org.apache.kafka.connect.data.Timestamp;
|
||||
import org.apache.kafka.connect.data.Values;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* This benchmark tests the performance of the {@link Values} data handling class.
|
||||
*/
|
||||
@State(Scope.Benchmark)
|
||||
@Fork(value = 1)
|
||||
@Warmup(iterations = 3)
|
||||
@Measurement(iterations = 7)
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||
public class ValuesBenchmark {
|
||||
|
||||
private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
|
||||
private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
|
||||
.field("field", Schema.INT32_SCHEMA)
|
||||
.build();
|
||||
private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
|
||||
.field("first", Schema.INT32_SCHEMA)
|
||||
.field("second", Schema.STRING_SCHEMA)
|
||||
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
|
||||
.field("map", MAP_INT_STRING_SCHEMA)
|
||||
.field("nested", FLAT_STRUCT_SCHEMA)
|
||||
.build();
|
||||
private static final SchemaAndValue[] TEST_VALUES = {
|
||||
SchemaAndValue.NULL,
|
||||
new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true),
|
||||
new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false),
|
||||
new SchemaAndValue(Schema.OPTIONAL_INT8_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 0),
|
||||
new SchemaAndValue(Schema.INT8_SCHEMA, Byte.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_INT16_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.INT16_SCHEMA, (short) 0),
|
||||
new SchemaAndValue(Schema.INT16_SCHEMA, Short.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_INT32_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.INT32_SCHEMA, 0),
|
||||
new SchemaAndValue(Schema.INT32_SCHEMA, Integer.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_INT64_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.INT64_SCHEMA, (long) 0),
|
||||
new SchemaAndValue(Schema.INT64_SCHEMA, Long.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_FLOAT32_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.FLOAT32_SCHEMA, (float) 0),
|
||||
new SchemaAndValue(Schema.FLOAT32_SCHEMA, 0.1f),
|
||||
new SchemaAndValue(Schema.FLOAT64_SCHEMA, 1.1f),
|
||||
new SchemaAndValue(Schema.FLOAT32_SCHEMA, Float.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_FLOAT64_SCHEMA, null),
|
||||
new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 0),
|
||||
new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 0.1f),
|
||||
new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 1.1f),
|
||||
new SchemaAndValue(Schema.FLOAT64_SCHEMA, Double.MAX_VALUE),
|
||||
new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, null),
|
||||
new SchemaAndValue(Date.SCHEMA, "2019-08-23"),
|
||||
new SchemaAndValue(Time.SCHEMA, "14:34:54.346Z"),
|
||||
new SchemaAndValue(Timestamp.SCHEMA, "2019-08-23T14:34:54.346Z"),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, ""),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, "a-random-string"),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, "[]"),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, "[1, 2, 3]"),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, "{}"),
|
||||
new SchemaAndValue(Schema.STRING_SCHEMA, "{\"1\": 2, \"3\": 4}"),
|
||||
new SchemaAndValue(SchemaBuilder.array(Schema.INT16_SCHEMA), new short[]{1, 2, 3}),
|
||||
new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA),
|
||||
Collections.singletonMap("key", true)),
|
||||
new SchemaAndValue(STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA)
|
||||
.put("first", 1)
|
||||
.put("second", "foo")
|
||||
.put("array", Arrays.asList(1, 2, 3))
|
||||
.put("map", Collections.singletonMap(1, "value"))
|
||||
.put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12))),
|
||||
};
|
||||
|
||||
private SchemaAndValue[] convertToBooleanCases;
|
||||
private SchemaAndValue[] convertToByteCases;
|
||||
private SchemaAndValue[] convertToDateCases;
|
||||
private SchemaAndValue[] convertToDecimalCases;
|
||||
private SchemaAndValue[] convertToDoubleCases;
|
||||
private SchemaAndValue[] convertToFloatCases;
|
||||
private SchemaAndValue[] convertToShortCases;
|
||||
private SchemaAndValue[] convertToListCases;
|
||||
private SchemaAndValue[] convertToMapCases;
|
||||
private SchemaAndValue[] convertToLongCases;
|
||||
private SchemaAndValue[] convertToIntegerCases;
|
||||
private SchemaAndValue[] convertToStructCases;
|
||||
private SchemaAndValue[] convertToTimeCases;
|
||||
private SchemaAndValue[] convertToTimestampCases;
|
||||
private SchemaAndValue[] convertToStringCases;
|
||||
private String[] parseStringCases;
|
||||
|
||||
private SchemaAndValue[] successfulCases(BiFunction<Schema, Object, Object> fn) {
|
||||
List<SchemaAndValue> successful = new ArrayList<>();
|
||||
for (SchemaAndValue testCase : TEST_VALUES) {
|
||||
try {
|
||||
fn.apply(testCase.schema(), testCase.value());
|
||||
successful.add(testCase);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
return successful.toArray(new SchemaAndValue[]{});
|
||||
}
|
||||
|
||||
private String[] casesToString(Function<String, Object> fn) {
|
||||
List<String> successful = new ArrayList<>();
|
||||
for (SchemaAndValue testCase : TEST_VALUES) {
|
||||
String v = String.valueOf(testCase.value());
|
||||
try {
|
||||
fn.apply(v);
|
||||
successful.add(v);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
return successful.toArray(new String[]{});
|
||||
}
|
||||
|
||||
@Setup
|
||||
public void setup() {
|
||||
convertToBooleanCases = successfulCases(Values::convertToBoolean);
|
||||
convertToByteCases = successfulCases(Values::convertToByte);
|
||||
convertToDateCases = successfulCases(Values::convertToDate);
|
||||
convertToDecimalCases = successfulCases((schema, object) -> Values.convertToDecimal(schema, object, 1));
|
||||
convertToDoubleCases = successfulCases(Values::convertToDouble);
|
||||
convertToFloatCases = successfulCases(Values::convertToFloat);
|
||||
convertToShortCases = successfulCases(Values::convertToShort);
|
||||
convertToListCases = successfulCases(Values::convertToList);
|
||||
convertToMapCases = successfulCases(Values::convertToMap);
|
||||
convertToLongCases = successfulCases(Values::convertToLong);
|
||||
convertToIntegerCases = successfulCases(Values::convertToInteger);
|
||||
convertToStructCases = successfulCases(Values::convertToStruct);
|
||||
convertToTimeCases = successfulCases(Values::convertToTime);
|
||||
convertToTimestampCases = successfulCases(Values::convertToTimestamp);
|
||||
convertToStringCases = successfulCases(Values::convertToString);
|
||||
parseStringCases = casesToString(Values::parseString);
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToBoolean(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToBooleanCases) {
|
||||
blackhole.consume(Values.convertToBoolean(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToByte(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToByteCases) {
|
||||
blackhole.consume(Values.convertToByte(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToDate(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToDateCases) {
|
||||
blackhole.consume(Values.convertToDate(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToDecimal(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToDecimalCases) {
|
||||
blackhole.consume(Values.convertToDecimal(testCase.schema(), testCase.value(), 1));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToDouble(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToDoubleCases) {
|
||||
blackhole.consume(Values.convertToDouble(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToFloat(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToFloatCases) {
|
||||
blackhole.consume(Values.convertToFloat(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToShort(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToShortCases) {
|
||||
blackhole.consume(Values.convertToShort(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToList(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToListCases) {
|
||||
blackhole.consume(Values.convertToList(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToMap(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToMapCases) {
|
||||
blackhole.consume(Values.convertToMap(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToLong(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToLongCases) {
|
||||
blackhole.consume(Values.convertToLong(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToInteger(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToIntegerCases) {
|
||||
blackhole.consume(Values.convertToInteger(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToStruct(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToStructCases) {
|
||||
blackhole.consume(Values.convertToStruct(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToTime(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToTimeCases) {
|
||||
blackhole.consume(Values.convertToTime(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToTimestamp(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToTimestampCases) {
|
||||
blackhole.consume(Values.convertToTimestamp(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testConvertToString(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : convertToStringCases) {
|
||||
blackhole.consume(Values.convertToString(testCase.schema(), testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testInferSchema(Blackhole blackhole) {
|
||||
for (SchemaAndValue testCase : TEST_VALUES) {
|
||||
blackhole.consume(Values.inferSchema(testCase.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testParseString(Blackhole blackhole) {
|
||||
for (String testCase : parseStringCases) {
|
||||
blackhole.consume(Values.parseString(testCase));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue