diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index 6a256fc9c06..cda7a771c51 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -57,7 +57,7 @@ public class FileStreamSinkTaskTest { @Test public void testPutFlush() { - HashMap offsets = new HashMap<>(); + Map offsets = new HashMap<>(); final String newLine = System.lineSeparator(); // We do not call task.start() since it would override the output stream diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 7fa5358f1c3..6c999e3e6a0 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -152,8 +152,6 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned { return switch (config.decimalFormat()) { case NUMERIC -> JSON_NODE_FACTORY.numberNode(decimal); case BASE64 -> JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal)); - default -> - throw new DataException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat()); }; } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index ecaf6bd7bd7..db86fbdb40b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -371,7 +371,7 @@ public class MirrorCheckpointTask extends SourceTask { offsetToSync.put(topicPartition, convertedOffset); } - if (offsetToSync.size() == 0) { + if (offsetToSync.isEmpty()) { log.trace("skip syncing the offset for consumer group: {}", consumerGroupId); continue; } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index c26d326b223..f0aab090bb2 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -231,7 +231,7 @@ public class MirrorMaker { } private void addHerder(SourceAndTarget sourceAndTarget) { - log.info("creating herder for " + sourceAndTarget.toString()); + log.info("creating herder for {}", sourceAndTarget.toString()); Map workerProps = config.workerConfig(sourceAndTarget); String encodedSource = encodePath(sourceAndTarget.source()); String encodedTarget = encodePath(sourceAndTarget.target()); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index 01e70e0af97..7bcb3b5f742 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -277,7 +277,7 @@ public final class MirrorMakerConfig extends AbstractConfig { return transformed; } - protected static ConfigDef config() { + private static ConfigDef config() { ConfigDef result = new ConfigDef() .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC) .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index c46aac634fb..6e366573cfe 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.types.Type; import java.nio.ByteBuffer; -public class OffsetSync { +public record OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { public static final String TOPIC_KEY = "topic"; public static final String PARTITION_KEY = "partition"; public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset"; @@ -39,28 +39,6 @@ public class OffsetSync { new Field(TOPIC_KEY, Type.STRING), new Field(PARTITION_KEY, Type.INT32)); - private final TopicPartition topicPartition; - private final long upstreamOffset; - private final long downstreamOffset; - - public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { - this.topicPartition = topicPartition; - this.upstreamOffset = upstreamOffset; - this.downstreamOffset = downstreamOffset; - } - - public TopicPartition topicPartition() { - return topicPartition; - } - - public long upstreamOffset() { - return upstreamOffset; - } - - public long downstreamOffset() { - return downstreamOffset; - } - @Override public String toString() { return String.format("OffsetSync{topicPartition=%s, upstreamOffset=%d, downstreamOffset=%d}", diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java index b11d1ffdc99..fb65a1162e2 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.connect.util.Callback; import org.junit.jupiter.api.Test; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -38,8 +37,7 @@ public class CheckpointStoreTest { @Test public void testReadCheckpointsTopic() { - Set consumerGroups = new HashSet<>(); - consumerGroups.add("group1"); + Set consumerGroups = Set.of("group1"); MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class); when(config.checkpointsTopic()).thenReturn("checkpoint.topic"); @@ -70,8 +68,7 @@ public class CheckpointStoreTest { @Test public void testReadCheckpointsTopicError() { - Set consumerGroups = new HashSet<>(); - consumerGroups.add("group1"); + Set consumerGroups = Set.of("group1"); MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class); when(config.checkpointsTopic()).thenReturn("checkpoint.topic"); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index d800e74fbfc..ecb07dc529d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.connect.errors.RetriableException; import org.junit.jupiter.api.Test; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -146,7 +145,7 @@ public class MirrorCheckpointConnectorTest { MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config); connector = spy(connector); - Collection groups = List.of( + List groups = List.of( new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()), new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty())); Map offsets = new HashMap<>(); @@ -176,7 +175,7 @@ public class MirrorCheckpointConnectorTest { MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config); connector = spy(connector); - Collection groups = List.of( + List groups = List.of( new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()), new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()), new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()), diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index a8d5d520e39..1d1dd0feea3 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -142,7 +142,7 @@ public class DedicatedMirrorIntegrationTest { final String ba = b + "->" + a; final String testTopicPrefix = "test-topic-"; - Map mmProps = new HashMap() {{ + Map mmProps = new HashMap<>() {{ put("dedicated.mode.enable.internal.rest", "false"); put("listeners", "http://localhost:0"); // Refresh topics very frequently to quickly pick up on topics that are created @@ -204,7 +204,7 @@ public class DedicatedMirrorIntegrationTest { final String ab = a + "->" + b; final String testTopicPrefix = "test-topic-"; - Map mmProps = new HashMap() {{ + Map mmProps = new HashMap<>() {{ put("dedicated.mode.enable.internal.rest", "false"); put("listeners", "http://localhost:0"); // Refresh topics very frequently to quickly pick up on topics that are created @@ -288,7 +288,7 @@ public class DedicatedMirrorIntegrationTest { final String ba = b + "->" + a; final String testTopicPrefix = "test-topic-"; - Map mmProps = new HashMap() {{ + Map mmProps = new HashMap<>() {{ put("dedicated.mode.enable.internal.rest", "true"); put("listeners", "http://localhost:0"); // Refresh topics very frequently to quickly pick up on topics that are created diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java index 0a6ab4bab15..1d4339f3977 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.connect.mirror.IdentityReplicationPolicy; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import java.util.HashMap; +import java.util.Map; /** * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}. @@ -36,10 +36,10 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat @BeforeEach public void startClusters() throws Exception { replicateBackupToPrimary = false; - super.startClusters(new HashMap() {{ - put("replication.policy.class", IdentityReplicationPolicy.class.getName()); - put("topics", "test-topic-.*"); - }}); + super.startClusters(Map.of( + "replication.policy.class", IdentityReplicationPolicy.class.getName(), + "topics", "test-topic-.*" + )); } /* diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index a34850c8b4c..6d1d50f558b 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -152,7 +152,7 @@ public class MirrorConnectorsIntegrationBaseTest { @BeforeEach public void startClusters() throws Exception { - startClusters(new HashMap() {{ + startClusters(new HashMap<>() {{ put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*"); }}); } @@ -491,7 +491,7 @@ public class MirrorConnectorsIntegrationBaseTest { produceMessages(primaryProducer, "test-topic-1"); String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync"; - Map consumerProps = new HashMap() {{ + Map consumerProps = new HashMap<>() {{ put("group.id", consumerGroupName); put("auto.offset.reset", "earliest"); }}; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java index f08e5688592..5578c2b2877 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java @@ -81,7 +81,7 @@ public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectors assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), "New topic was not re-replicated to backup cluster after altering offsets."); - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings("unchecked") Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); stopMirrorMakerConnectors(backup, connectorsToReset); // Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java index a43af44329c..814a03d278b 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java @@ -151,7 +151,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi additionalBackupClusterClientsConfigs.putAll(superUserConfig()); backupWorkerProps.putAll(superUserConfig()); - HashMap additionalConfig = new HashMap(superUserConfig()) {{ + Map additionalConfig = new HashMap<>(superUserConfig()) {{ put(FORWARDING_ADMIN_CLASS, FakeForwardingAdminWithLocalMetadata.class.getName()); }}; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 0b828d5a0b7..3f9cf168fe9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -719,19 +719,8 @@ public class WorkerSinkTaskThreadedTest { private abstract static class TestSinkTask extends SinkTask { } - private static class ExpectOffsetCommitCommand { - final long expectedMessages; - final RuntimeException error; - final Exception consumerCommitError; - final long consumerCommitDelayMs; - final boolean invokeCallback; - - private ExpectOffsetCommitCommand(long expectedMessages, RuntimeException error, Exception consumerCommitError, long consumerCommitDelayMs, boolean invokeCallback) { - this.expectedMessages = expectedMessages; - this.error = error; - this.consumerCommitError = consumerCommitError; - this.consumerCommitDelayMs = consumerCommitDelayMs; - this.invokeCallback = invokeCallback; - } + private record ExpectOffsetCommitCommand(long expectedMessages, RuntimeException error, + Exception consumerCommitError, long consumerCommitDelayMs, + boolean invokeCallback) { } } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java index f6b98aab8c4..39654859edc 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java @@ -86,18 +86,7 @@ public class HasHeaderKeyTest { Arrays.stream(headers).map(TestHeader::new).collect(Collectors.toList())); } - private static class TestHeader implements Header { - - private final String key; - - public TestHeader(String key) { - this.key = key; - } - - @Override - public String key() { - return key; - } + private record TestHeader(String key) implements Header { @Override public Schema schema() {