From e974914ca52509a2396e3939a6db6a4e9f9c7dfa Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia <122860866+sjhajharia@users.noreply.github.com> Date: Fri, 31 May 2024 02:25:00 +0530 Subject: [PATCH] MINOR: Code Cleanup - Connect Module (#16066) Reviewers: Chia-Ping Tsai --- .../kafka/connect/data/SchemaBuilder.java | 2 +- .../ConnectorReconfigurationTest.java | 2 +- .../connect/data/SchemaProjectorTest.java | 2 +- .../connect/util/ConnectorUtilsTest.java | 24 +++--- .../connect/file/FileStreamSinkTaskTest.java | 5 +- ...eStreamSourceConnectorIntegrationTest.java | 4 +- .../connect/mirror/rest/MirrorRestServer.java | 3 +- .../clients/admin/FakeLocalMetadataStore.java | 28 +++---- ...hCustomForwardingAdminIntegrationTest.java | 6 +- .../connect/runtime/ConnectorConfig.java | 2 +- .../runtime/rest/ConnectRestServer.java | 3 +- .../runtime/rest/entities/ConfigInfos.java | 20 +++-- .../runtime/rest/entities/ConfigKeyInfo.java | 48 ++++++----- .../rest/entities/ConfigValueInfo.java | 24 +++--- .../ConnectorTopicsIntegrationTest.java | 4 +- .../kafka/connect/integration/TaskHandle.java | 2 +- .../runtime/ErrorHandlingTaskTest.java | 6 +- .../connect/runtime/MockConnectMetrics.java | 2 +- .../connect/runtime/WorkerSinkTaskTest.java | 18 ++--- .../kafka/connect/runtime/WorkerTest.java | 6 +- .../ConnectProtocolCompatibilityTest.java | 16 ++-- .../distributed/DistributedHerderTest.java | 80 +++++++++---------- .../IncrementalCooperativeAssignorTest.java | 6 +- .../WorkerCoordinatorIncrementalTest.java | 22 ++--- .../runtime/rest/ConnectRestServerTest.java | 2 +- .../runtime/rest/RestServerConfigTest.java | 3 +- .../storage/FileOffsetBackingStoreTest.java | 10 +-- .../KafkaConfigBackingStoreMockitoTest.java | 8 +- .../storage/KafkaConfigBackingStoreTest.java | 8 +- .../storage/KafkaOffsetBackingStoreTest.java | 2 +- .../util/ConvertingFutureCallbackTest.java | 6 +- .../apache/kafka/connect/util/TestFuture.java | 2 +- 32 files changed, 186 insertions(+), 190 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index b8838430899..8115675f5a5 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -88,7 +88,7 @@ public class SchemaBuilder implements Schema { @Override public boolean isOptional() { - return optional == null ? false : optional; + return optional != null && optional; } /** diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java index b895ed39858..efa56aca469 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java @@ -46,7 +46,7 @@ public class ConnectorReconfigurationTest { private static class TestConnector extends Connector { - private boolean stopException; + private final boolean stopException; private int order = 0; public int stopOrder = -1; public int configureOrder = -1; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 3e0c9de8d4c..32e304c218a 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -82,7 +82,7 @@ public class SchemaProjectorTest { expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.)); expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.)); expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2)); - expectedProjected.put(values[5], Arrays.asList(1.2345)); + expectedProjected.put(values[5], Collections.singletonList(1.2345)); Object promoted; for (int i = 0; i < promotableSchemas.length; ++i) { diff --git a/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java index b6f96bd4bc1..1972ff7a89d 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java @@ -33,7 +33,7 @@ public class ConnectorUtilsTest { public void testGroupPartitions() { List> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1); - assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped); + assertEquals(Collections.singletonList(FIVE_ELEMENTS), grouped); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2); assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped); @@ -41,21 +41,21 @@ public class ConnectorUtilsTest { grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3); assertEquals(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), - Arrays.asList(5)), grouped); + Collections.singletonList(5)), grouped); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5); - assertEquals(Arrays.asList(Arrays.asList(1), - Arrays.asList(2), - Arrays.asList(3), - Arrays.asList(4), - Arrays.asList(5)), grouped); + assertEquals(Arrays.asList(Collections.singletonList(1), + Collections.singletonList(2), + Collections.singletonList(3), + Collections.singletonList(4), + Collections.singletonList(5)), grouped); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7); - assertEquals(Arrays.asList(Arrays.asList(1), - Arrays.asList(2), - Arrays.asList(3), - Arrays.asList(4), - Arrays.asList(5), + assertEquals(Arrays.asList(Collections.singletonList(1), + Collections.singletonList(2), + Collections.singletonList(3), + Collections.singletonList(4), + Collections.singletonList(5), Collections.emptyList(), Collections.emptyList()), grouped); } diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index 11106e5a179..23a28d8527c 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -60,7 +61,7 @@ public class FileStreamSinkTaskTest { // We do not call task.start() since it would override the output stream - task.put(Arrays.asList( + task.put(Collections.singletonList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); @@ -85,7 +86,7 @@ public class FileStreamSinkTaskTest { task.start(props); HashMap offsets = new HashMap<>(); - task.put(Arrays.asList( + task.put(Collections.singletonList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java index 95dabf703c5..698f4fcf8d3 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java @@ -94,7 +94,7 @@ public class FileStreamSourceConnectorIntegrationTest { // Append NUM_LINES more lines to the file try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { - printStream.println(String.format(LINE_FORMAT, i)); + printStream.printf(LINE_FORMAT + "%n", i); } } @@ -197,7 +197,7 @@ public class FileStreamSourceConnectorIntegrationTest { try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath()))) { for (int i = 0; i < numLines; i++) { - printStream.println(String.format(LINE_FORMAT, i)); + printStream.printf(LINE_FORMAT + "%n", i); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index a5abeff40ce..7d24a5f14db 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -26,7 +26,6 @@ import org.glassfish.hk2.api.TypeLiteral; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -48,7 +47,7 @@ public class MirrorRestServer extends RestServer { @Override protected Collection> regularResources() { - return Arrays.asList( + return Collections.singletonList( InternalMirrorResource.class ); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java index 830ddb32eb1..99706d4eaea 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java @@ -34,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; public class FakeLocalMetadataStore { private static final Logger log = LoggerFactory.getLogger(FakeLocalMetadataStore.class); - private static ConcurrentHashMap> allTopics = new ConcurrentHashMap<>(); - private static ConcurrentHashMap> allAcls = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap> ALL_TOPICS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap> ALL_ACLS = new ConcurrentHashMap<>(); /** * Add topic to allTopics. @@ -44,7 +44,7 @@ public class FakeLocalMetadataStore { public static void addTopicToLocalMetadataStore(NewTopic newTopic) { ConcurrentHashMap configs = new ConcurrentHashMap<>(newTopic.configs()); configs.putIfAbsent("partitions", String.valueOf(newTopic.numPartitions())); - allTopics.putIfAbsent(newTopic.name(), configs); + ALL_TOPICS.putIfAbsent(newTopic.name(), configs); } /** @@ -53,9 +53,9 @@ public class FakeLocalMetadataStore { * @param newPartitionCount new partition count. */ public static void updatePartitionCount(String topic, int newPartitionCount) { - ConcurrentHashMap configs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); + ConcurrentHashMap configs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>()); configs.compute("partitions", (key, value) -> String.valueOf(newPartitionCount)); - FakeLocalMetadataStore.allTopics.putIfAbsent(topic, configs); + FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, configs); } /** @@ -64,7 +64,7 @@ public class FakeLocalMetadataStore { * @param newConfig topic config */ public static void updateTopicConfig(String topic, Config newConfig) { - ConcurrentHashMap topicConfigs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); + ConcurrentHashMap topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>()); newConfig.entries().stream().forEach(configEntry -> { if (configEntry.name() != null) { if (configEntry.value() != null) { @@ -75,7 +75,7 @@ public class FakeLocalMetadataStore { } } }); - FakeLocalMetadataStore.allTopics.putIfAbsent(topic, topicConfigs); + FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, topicConfigs); } /** @@ -84,7 +84,7 @@ public class FakeLocalMetadataStore { * @return true if topic name is a key in allTopics */ public static Boolean containsTopic(String topic) { - return allTopics.containsKey(topic); + return ALL_TOPICS.containsKey(topic); } /** @@ -93,7 +93,7 @@ public class FakeLocalMetadataStore { * @return topic configurations. */ public static Map topicConfig(String topic) { - return allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); + return ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>()); } /** @@ -102,7 +102,7 @@ public class FakeLocalMetadataStore { * @return {@link List} */ public static List aclBindings(String aclPrinciple) { - return FakeLocalMetadataStore.allAcls.getOrDefault("User:" + aclPrinciple, new Vector<>()); + return FakeLocalMetadataStore.ALL_ACLS.getOrDefault("User:" + aclPrinciple, new Vector<>()); } /** @@ -111,16 +111,16 @@ public class FakeLocalMetadataStore { * @param aclBinding {@link AclBinding} */ public static void addACLs(String principal, AclBinding aclBinding) { - Vector aclBindings = FakeLocalMetadataStore.allAcls.getOrDefault(principal, new Vector<>()); + Vector aclBindings = FakeLocalMetadataStore.ALL_ACLS.getOrDefault(principal, new Vector<>()); aclBindings.add(aclBinding); - FakeLocalMetadataStore.allAcls.putIfAbsent(principal, aclBindings); + FakeLocalMetadataStore.ALL_ACLS.putIfAbsent(principal, aclBindings); } /** * clear allTopics and allAcls. */ public static void clear() { - allTopics.clear(); - allAcls.clear(); + ALL_TOPICS.clear(); + ALL_ACLS.clear(); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java index b54aa7073ce..70f1cd6f6a3 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java @@ -164,7 +164,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi startClusters(additionalConfig); try (Admin adminClient = primary.kafka().createAdminClient()) { - adminClient.createAcls(Arrays.asList( + adminClient.createAcls(Collections.singletonList( new AclBinding( new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) @@ -172,7 +172,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi )).all().get(); } try (Admin adminClient = backup.kafka().createAdminClient()) { - adminClient.createAcls(Arrays.asList( + adminClient.createAcls(Collections.singletonList( new AclBinding( new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) @@ -293,7 +293,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception { mm2Props.put("sync.topic.acls.enabled", "true"); mm2Config = new MirrorMakerConfig(mm2Props); - List aclBindings = Arrays.asList( + List aclBindings = Collections.singletonList( new AclBinding( new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL), new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 93ea9499df2..c262fd076a4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -324,7 +324,7 @@ public class ConnectorConfig extends AbstractConfig { @SuppressWarnings("unchecked") Predicate predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); predicate.configure(originalsWithPrefix(predicatePrefix)); - transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); + transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation)); } else { transformations.add(new TransformationStage<>(transformation)); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java index 3adbc0f14ec..fa25a5ea60d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java @@ -27,6 +27,7 @@ import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Map; public class ConnectRestServer extends RestServer { @@ -56,7 +57,7 @@ public class ConnectRestServer extends RestServer { @Override protected Collection> adminResources() { - return Arrays.asList( + return Collections.singletonList( LoggingResource.class ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java index d0f67386de3..dd075b5f90d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java @@ -85,17 +85,15 @@ public class ConfigInfos { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("[") - .append(name) - .append(",") - .append(errorCount) - .append(",") - .append(groups) - .append(",") - .append(configs) - .append("]"); - return sb.toString(); + return "[" + + name + + "," + + errorCount + + "," + + groups + + "," + + configs + + "]"; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java index 5cfdf2dd4f0..0b1a41c212e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java @@ -141,30 +141,28 @@ public class ConfigKeyInfo { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("[") - .append(name) - .append(",") - .append(type) - .append(",") - .append(required) - .append(",") - .append(defaultValue) - .append(",") - .append(importance) - .append(",") - .append(documentation) - .append(",") - .append(group) - .append(",") - .append(orderInGroup) - .append(",") - .append(width) - .append(",") - .append(displayName) - .append(",") - .append(dependents) - .append("]"); - return sb.toString(); + return "[" + + name + + "," + + type + + "," + + required + + "," + + defaultValue + + "," + + importance + + "," + + documentation + + "," + + group + + "," + + orderInGroup + + "," + + width + + "," + + displayName + + "," + + dependents + + "]"; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java index ff4215d1122..a5528730e22 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java @@ -87,19 +87,17 @@ public class ConfigValueInfo { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("[") - .append(name) - .append(",") - .append(value) - .append(",") - .append(recommendedValues) - .append(",") - .append(errors) - .append(",") - .append(visible) - .append("]"); - return sb.toString(); + return "[" + + name + + "," + + value + + "," + + recommendedValues + + "," + + errors + + "," + + visible + + "]"; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index 61be2769f3b..eb055ab13fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -199,8 +199,8 @@ public class ConnectorTopicsIntegrationTest { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS, "Connector tasks did not start in time."); - connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC), - "Active topic set is not: " + Arrays.asList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR); + connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC), + "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR); // deleting a connector resets its active topics connect.deleteConnector(FOO_CONNECTOR); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java index ab5b711af0f..fe63658a757 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java @@ -48,7 +48,7 @@ public class TaskHandle { private CountDownLatch recordsRemainingLatch; private CountDownLatch recordsToCommitLatch; private int expectedRecords = -1; - private int expectedCommits = -1; + private final int expectedCommits = -1; public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer consumer) { this.taskId = taskId; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index a0b8cecc8f4..3aeecc1d757 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -134,8 +134,8 @@ public class ErrorHandlingTaskTest { private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); - private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private TargetState initialState = TargetState.STARTED; + private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private final TargetState initialState = TargetState.STARTED; private Time time; private MockConnectMetrics metrics; @SuppressWarnings("unused") @@ -179,7 +179,7 @@ public class ErrorHandlingTaskTest { private ErrorHandlingMetrics errorHandlingMetrics; - private boolean enableTopicCreation; + private final boolean enableTopicCreation; @Parameterized.Parameters public static Collection parameters() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java index 597041715b5..c5f9f8314d9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java @@ -150,7 +150,7 @@ public class MockConnectMetrics extends ConnectMetrics { } public static class MockMetricsReporter implements MetricsReporter { - private Map metricsByName = new HashMap<>(); + private final Map metricsByName = new HashMap<>(); private MetricsContext metricsContext; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 21b2b10c16e..75f942a6871 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -141,14 +141,14 @@ public class WorkerSinkTaskTest { private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); - private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1); - private TargetState initialState = TargetState.STARTED; + private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1); + private final TargetState initialState = TargetState.STARTED; private MockTime time; private WorkerSinkTask workerTask; @Mock private SinkTask sinkTask; - private ArgumentCaptor sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class); + private final ArgumentCaptor sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class); private WorkerConfig workerConfig; private MockConnectMetrics metrics; @Mock @@ -169,7 +169,7 @@ public class WorkerSinkTaskTest { private KafkaConsumer consumer; @Mock private ErrorHandlingMetrics errorHandlingMetrics; - private ArgumentCaptor rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + private final ArgumentCaptor rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); @Rule public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); @@ -684,9 +684,9 @@ public class WorkerSinkTaskTest { when(consumer.assignment()) .thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) - .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) - .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) - .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) + .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2))) + .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2))) + .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2))) .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); @@ -1788,7 +1788,7 @@ public class WorkerSinkTaskTest { } private void verifyInitializeTask() { - verify(consumer).subscribe(eq(asList(TOPIC)), rebalanceListener.capture()); + verify(consumer).subscribe(eq(Collections.singletonList(TOPIC)), rebalanceListener.capture()); verify(sinkTask).initialize(sinkTaskContext.capture()); verify(sinkTask).start(TASK_PROPS); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 4579794a2c4..b51b84d1ac6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -2602,7 +2602,7 @@ public class WorkerTest { Map taskConfig = new HashMap<>(); // No warnings or exceptions when a connector generates an empty list of task configs - when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList()); + when(sourceConnector.taskConfigs(1)).thenReturn(Collections.emptyList()); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { connectorProps.put(TASKS_MAX_CONFIG, "1"); List> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); @@ -2611,7 +2611,7 @@ public class WorkerTest { } // No warnings or exceptions when a connector generates the maximum permitted number of task configs - when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); + when(sourceConnector.taskConfigs(1)).thenReturn(Collections.singletonList(taskConfig)); when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, taskConfig)); when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, taskConfig, taskConfig)); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { @@ -2672,7 +2672,7 @@ public class WorkerTest { } // One last sanity check in case the connector is reconfigured and respects tasks.max - when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); + when(sourceConnector.taskConfigs(1)).thenReturn(Collections.singletonList(taskConfig)); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { connectorProps.put(TASKS_MAX_CONFIG, "1"); List> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java index 6dcbe6c38e6..38084383e29 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java @@ -98,7 +98,7 @@ public class ConnectProtocolCompatibilityTest { public void testEagerToEagerAssignment() { ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, - Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0)); + Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0)); ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment); ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf); @@ -110,7 +110,7 @@ public class ConnectProtocolCompatibilityTest { ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment( ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, - Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); + Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf); @@ -125,7 +125,7 @@ public class ConnectProtocolCompatibilityTest { public void testCoopToCoopAssignment() { ExtendedAssignment assignment = new ExtendedAssignment( CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, - Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0), + Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false); @@ -138,7 +138,7 @@ public class ConnectProtocolCompatibilityTest { ExtendedAssignment assignment2 = new ExtendedAssignment( CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, - Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), + Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); @@ -155,7 +155,7 @@ public class ConnectProtocolCompatibilityTest { public void testEagerToCoopAssignment() { ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, - Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0)); + Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0)); ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment); ConnectProtocol.Assignment leaderAssignment = @@ -168,7 +168,7 @@ public class ConnectProtocolCompatibilityTest { ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment( ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, - Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); + Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); ConnectProtocol.Assignment memberAssignment = @@ -184,7 +184,7 @@ public class ConnectProtocolCompatibilityTest { public void testCoopToEagerAssignment() { ExtendedAssignment assignment = new ExtendedAssignment( CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, - Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0), + Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false); @@ -197,7 +197,7 @@ public class ConnectProtocolCompatibilityTest { ExtendedAssignment assignment2 = new ExtendedAssignment( CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, - Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), + Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2, false); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index f4a7cd247bd..f69f586bc90 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -331,7 +331,7 @@ public class DistributedHerderTest { // Join group and get assignment when(member.memberId()).thenReturn("member"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1)); expectConfigRefreshAndSnapshot(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -355,7 +355,7 @@ public class DistributedHerderTest { // Join group and get assignment when(member.memberId()).thenReturn("member"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1)); expectConfigRefreshAndSnapshot(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -380,8 +380,8 @@ public class DistributedHerderTest { verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED)); // Rebalance and get a new assignment - expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR, - 1, Arrays.asList(CONN1), Arrays.asList()); + expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR, + 1, singletonList(CONN1), Collections.emptyList()); herder.tick(); time.sleep(3000L); assertStatistics(3, 2, 100, 3000); @@ -414,7 +414,7 @@ public class DistributedHerderTest { // The new member got its assignment expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, - 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + 1, singletonList(CONN1), singletonList(TASK1), 0); // and the new assignment started ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -445,7 +445,7 @@ public class DistributedHerderTest { // Join group. First rebalance contains revocations because a new member joined. when(member.memberId()).thenReturn("member"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); - expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), + expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(), Collections.emptyList(), 0); doNothing().when(member).requestRejoin(); @@ -482,7 +482,7 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, - Collections.emptyList(), Arrays.asList(TASK2), + Collections.emptyList(), singletonList(TASK2), rebalanceDelay); expectConfigRefreshAndSnapshot(SNAPSHOT); @@ -503,7 +503,7 @@ public class DistributedHerderTest { // The member got its assignment and revocation expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, - 1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); + 1, singletonList(CONN1), singletonList(TASK1), 0); // and the new assignment started ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -528,7 +528,7 @@ public class DistributedHerderTest { // Join group and get assignment when(member.memberId()).thenReturn("member"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1)); expectConfigRefreshAndSnapshot(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -550,8 +550,8 @@ public class DistributedHerderTest { verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED)); // Rebalance and get a new assignment - expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR, - 1, Arrays.asList(CONN1), Arrays.asList()); + expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR, + 1, singletonList(CONN1), Collections.emptyList()); // worker is not running, so we should see no call to connectorTaskConfigs() expectExecuteTaskReconfiguration(false, null, null); @@ -606,7 +606,7 @@ public class DistributedHerderTest { // Perform a partial re-balance just prior to the revocation // bump the configOffset to trigger reading the config topic to the end configOffset++; - expectRebalance(configOffset, Arrays.asList(), Arrays.asList()); + expectRebalance(configOffset, Collections.emptyList(), Collections.emptyList()); // give it the wrong snapshot, as if we're out of sync/can't reach the broker expectConfigRefreshAndSnapshot(SNAPSHOT); doNothing().when(member).requestRejoin(); @@ -616,9 +616,9 @@ public class DistributedHerderTest { } // Revoke the connector in the next rebalance - expectRebalance(Arrays.asList(CONN1), Arrays.asList(), - ConnectProtocol.Assignment.NO_ERROR, configOffset, Arrays.asList(), - Arrays.asList()); + expectRebalance(singletonList(CONN1), Collections.emptyList(), + ConnectProtocol.Assignment.NO_ERROR, configOffset, Collections.emptyList(), + Collections.emptyList()); if (incompleteRebalance) { // Same as SNAPSHOT, except with an updated offset @@ -643,7 +643,7 @@ public class DistributedHerderTest { herder.tick(); // re-assign the connector back to the same worker to ensure state was cleaned up - expectRebalance(configOffset, Arrays.asList(CONN1), Arrays.asList()); + expectRebalance(configOffset, singletonList(CONN1), Collections.emptyList()); herder.tick(); @@ -973,7 +973,7 @@ public class DistributedHerderTest { when(member.memberId()).thenReturn("leader"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); // Start with one connector - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -1006,7 +1006,7 @@ public class DistributedHerderTest { doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(FOO_TOPIC)); doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(BAR_TOPIC)); - expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), + expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR, 2, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true); expectConfigRefreshAndSnapshot(ClusterConfigState.EMPTY); @@ -1533,7 +1533,7 @@ public class DistributedHerderTest { // Performs rebalance and gets new assignment expectRebalance(Collections.emptyList(), Collections.emptyList(), - ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.emptyList()); + ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList()); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -1556,7 +1556,7 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); // join - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectRebalance(1, singletonList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -1591,7 +1591,7 @@ public class DistributedHerderTest { WorkerConfigTransformer configTransformer = mock(WorkerConfigTransformer.class); // join - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectRebalance(1, singletonList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -1646,7 +1646,7 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); // join - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectRebalance(1, singletonList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -1683,7 +1683,7 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); // start with the connector paused - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectRebalance(1, singletonList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1); expectMemberPoll(); @@ -1723,7 +1723,7 @@ public class DistributedHerderTest { when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); // join - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectRebalance(1, singletonList(CONN1), Collections.emptyList()); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -1976,7 +1976,7 @@ public class DistributedHerderTest { // Performs rebalance and gets new assignment expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(), - Arrays.asList(TASK0)); + singletonList(TASK0)); expectConfigRefreshAndSnapshot(SNAPSHOT); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); @@ -2014,7 +2014,7 @@ public class DistributedHerderTest { before = time.milliseconds(); // After backoff, restart the process and this time succeed - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true); expectConfigRefreshAndSnapshot(SNAPSHOT); ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); @@ -2051,7 +2051,7 @@ public class DistributedHerderTest { when(member.memberId()).thenReturn("leader"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -2072,7 +2072,7 @@ public class DistributedHerderTest { // The leader gets the same assignment after a rebalance is triggered expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, - 1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); + 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true); time.sleep(2000L); assertStatistics(3, 1, 100, 2000); @@ -2106,7 +2106,7 @@ public class DistributedHerderTest { // After a few retries succeed to read the log to the end expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, - 1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); + 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true); expectConfigRefreshAndSnapshot(SNAPSHOT); before = time.milliseconds(); @@ -2125,7 +2125,7 @@ public class DistributedHerderTest { when(member.memberId()).thenReturn("leader"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); + expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true); expectConfigRefreshAndSnapshot(SNAPSHOT); expectMemberPoll(); @@ -2146,7 +2146,7 @@ public class DistributedHerderTest { // The leader gets the same assignment after a rebalance is triggered expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, - "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); + "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true); time.sleep(2000L); assertStatistics(3, 1, 100, 2000); @@ -2190,7 +2190,7 @@ public class DistributedHerderTest { // The worker gets back the assignment that had given up expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, - 1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), + 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true); expectConfigRefreshAndSnapshot(SNAPSHOT); @@ -2267,7 +2267,7 @@ public class DistributedHerderTest { @Test public void testPutConnectorConfig() throws Exception { when(member.memberId()).thenReturn("leader"); - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); expectConfigRefreshAndSnapshot(SNAPSHOT); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); @@ -2388,7 +2388,7 @@ public class DistributedHerderTest { // Patch the connector config. expectMemberEnsureActive(); - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), false); FutureCallback> patchCallback = new FutureCallback<>(); herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback); @@ -2401,7 +2401,7 @@ public class DistributedHerderTest { @Test public void testPatchConnectorConfig() throws Exception { when(member.memberId()).thenReturn("leader"); - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); Map originalConnConfig = new HashMap<>(CONN1_CONFIG); @@ -2440,7 +2440,7 @@ public class DistributedHerderTest { patchedConnConfig.put("foo3", "added"); expectMemberEnsureActive(); - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true); ArgumentCaptor> validateCallback = ArgumentCaptor.forClass(Callback.class); doAnswer(invocation -> { @@ -2567,7 +2567,7 @@ public class DistributedHerderTest { verify(member).wakeup(); verifyNoMoreInteractions(member, taskConfigCb); assertEquals( - Arrays.asList("awaiting startup"), + singletonList("awaiting startup"), stages ); } @@ -2584,7 +2584,7 @@ public class DistributedHerderTest { verify(member).wakeup(); verifyNoMoreInteractions(member, taskConfigCb); assertEquals( - Arrays.asList("awaiting startup"), + singletonList("awaiting startup"), stages ); } @@ -2690,7 +2690,7 @@ public class DistributedHerderTest { verifyNoMoreInteractions(member, taskConfigCb); assertEquals( - Arrays.asList("awaiting startup"), + singletonList("awaiting startup"), stages ); } @@ -3369,7 +3369,7 @@ public class DistributedHerderTest { public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() { when(member.memberId()).thenReturn("leader"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); herder.startAndStopExecutor.shutdown(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 3edb4d52dc0..319bdc9f9f8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -659,7 +659,7 @@ public class IncrementalCooperativeAssignorTest { .collect(Collectors.toList()); expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9")); expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10")); - expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8")); + expectedAssignment.get(2).connectors().add("connector8"); List newConnectors = newConnectors(6, 11); assignor.assignConnectors(existingAssignment, newConnectors); @@ -679,11 +679,11 @@ public class IncrementalCooperativeAssignorTest { expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9")); expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10")); - expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8")); + expectedAssignment.get(2).connectors().add("connector8"); expectedAssignment.get(0).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9))); expectedAssignment.get(1).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10))); - expectedAssignment.get(2).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 8))); + expectedAssignment.get(2).tasks().add(new ConnectorTaskId("task", 8)); List newConnectors = newConnectors(6, 11); assignor.assignConnectors(existingAssignment, newConnectors); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index 8b28c37aca9..ca5c3bdc6f8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -75,18 +75,18 @@ public class WorkerCoordinatorIncrementalTest { @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); - private String connectorId1 = "connector1"; - private String connectorId2 = "connector2"; - private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); - private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); + private final String connectorId1 = "connector1"; + private final String connectorId2 = "connector2"; + private final ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); + private final ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); - private String groupId = "test-group"; - private int sessionTimeoutMs = 10; - private int rebalanceTimeoutMs = 60; - private int heartbeatIntervalMs = 2; - private long retryBackoffMs = 100; - private long retryBackoffMaxMs = 1000; - private int requestTimeoutMs = 1000; + private final String groupId = "test-group"; + private final int sessionTimeoutMs = 10; + private final int rebalanceTimeoutMs = 60; + private final int heartbeatIntervalMs = 2; + private final long retryBackoffMs = 100; + private final long retryBackoffMaxMs = 1000; + private final int requestTimeoutMs = 1000; private MockTime time; private MockClient client; private Node node; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java index 72a51afd500..0494d002726 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java @@ -72,7 +72,7 @@ public class ConnectRestServerTest { @Mock private Plugins plugins; private ConnectRestServer server; private CloseableHttpClient httpClient; - private Collection responses = new ArrayList<>(); + private final Collection responses = new ArrayList<>(); protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java index 28dd725afd4..4930c1a3ef2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,7 +69,7 @@ public class RestServerConfigTest { props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999"); config = RestServerConfig.forPublic(null, props); - assertEquals(Arrays.asList("http://a.b:9999"), config.listeners()); + assertEquals(Collections.singletonList("http://a.b:9999"), config.listeners()); props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812"); config = RestServerConfig.forPublic(null, props); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java index 07d48a6e01f..4b3f6e67323 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java @@ -60,13 +60,13 @@ public class FileOffsetBackingStoreTest { private Converter converter; - private static Map firstSet = new HashMap<>(); + private static final Map FIRST_SET = new HashMap<>(); private static final Runnable EMPTY_RUNNABLE = () -> { }; static { - firstSet.put(buffer("key"), buffer("value")); - firstSet.put(null, null); + FIRST_SET.put(buffer("key"), buffer("value")); + FIRST_SET.put(null, null); } @Before @@ -96,7 +96,7 @@ public class FileOffsetBackingStoreTest { @SuppressWarnings("unchecked") Callback setCallback = mock(Callback.class); - store.set(firstSet, setCallback).get(); + store.set(FIRST_SET, setCallback).get(); Map values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get(); assertEquals(buffer("value"), values.get(buffer("key"))); @@ -109,7 +109,7 @@ public class FileOffsetBackingStoreTest { @SuppressWarnings("unchecked") Callback setCallback = mock(Callback.class); - store.set(firstSet, setCallback).get(); + store.set(FIRST_SET, setCallback).get(); store.stop(); // Restore into a new store to ensure correct reload from scratch diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index 6c9057a3517..6ebac341032 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -800,7 +800,7 @@ public class KafkaConfigBackingStoreMockitoTest { // Should see a single connector and its config should be the last one seen anywhere in the log ClusterConfigState configState = configStorage.snapshot(); assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); // Should see 0 tasks for that connector. @@ -1053,7 +1053,7 @@ public class KafkaConfigBackingStoreMockitoTest { // After reading the log, it should have been in an inconsistent state ClusterConfigState configState = configStorage.snapshot(); assertEquals(6, configState.offset()); // Should always be next to be read, not last committed - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] @@ -1086,8 +1086,8 @@ public class KafkaConfigBackingStoreMockitoTest { // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written // to the topic. Only the last call with 1 task config + 1 commit actually gets written. assertEquals(8, configState.offset()); - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index fc2caf75d9b..ae5f82cd3ee 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -151,7 +151,7 @@ public class KafkaConfigBackingStoreTest { private Converter converter; @Mock private ConfigBackingStore.UpdateListener configUpdateListener; - private Map props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS); + private final Map props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS); private DistributedConfig config; @Mock KafkaBasedLog storeLog; @@ -328,7 +328,7 @@ public class KafkaConfigBackingStoreTest { configState = configStorage.snapshot(); assertEquals(3, configState.offset()); String connectorName = CONNECTOR_IDS.get(0); - assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors())); assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); @@ -378,7 +378,7 @@ public class KafkaConfigBackingStoreTest { "tasks", 1); // Starts with 2 tasks, after update has 3 // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(2))); + configUpdateListener.onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(2))); EasyMock.expectLastCall(); // Records to be read by consumer as it reads to the end of the log @@ -473,7 +473,7 @@ public class KafkaConfigBackingStoreTest { configState = configStorage.snapshot(); assertEquals(1, configState.offset()); String connectorName = CONNECTOR_IDS.get(0); - assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors())); assertEquals(Collections.emptyList(), configState.tasks(connectorName)); assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index a0e4d569f40..b8503ceb839 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -110,7 +110,7 @@ public class KafkaOffsetBackingStoreTest { private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW"); private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW"); - private Map props = new HashMap<>(DEFAULT_PROPS); + private final Map props = new HashMap<>(DEFAULT_PROPS); @Mock KafkaBasedLog storeLog; @Mock diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java index 7977a291df6..b930cec34b5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java @@ -192,9 +192,9 @@ public class ConvertingFutureCallbackTest { } protected static class TestConvertingFutureCallback extends ConvertingFutureCallback { - private AtomicInteger numberOfConversions = new AtomicInteger(); - private CountDownLatch getInvoked = new CountDownLatch(1); - private CountDownLatch cancelInvoked = new CountDownLatch(1); + private final AtomicInteger numberOfConversions = new AtomicInteger(); + private final CountDownLatch getInvoked = new CountDownLatch(1); + private final CountDownLatch cancelInvoked = new CountDownLatch(1); public int numberOfConversions() { return numberOfConversions.get(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java index 0883040a33f..9130d8badc9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java @@ -26,7 +26,7 @@ public class TestFuture implements Future { private volatile boolean resolved; private T result; private Throwable exception; - private CountDownLatch getCalledLatch; + private final CountDownLatch getCalledLatch; private volatile boolean resolveOnGet; private T resolveOnGetResult;