mirror of https://github.com/apache/kafka.git
MINOR: Cleanups in Connect (#20077)
A few cleanups including Java 17 syntax, collections and assertEquals() order Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
This commit is contained in:
parent
4271fd8c8b
commit
dd52058466
|
@ -57,7 +57,7 @@ public class FileStreamSinkTaskTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFlush() {
|
public void testPutFlush() {
|
||||||
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
||||||
final String newLine = System.lineSeparator();
|
final String newLine = System.lineSeparator();
|
||||||
|
|
||||||
// We do not call task.start() since it would override the output stream
|
// We do not call task.start() since it would override the output stream
|
||||||
|
|
|
@ -152,8 +152,6 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {
|
||||||
return switch (config.decimalFormat()) {
|
return switch (config.decimalFormat()) {
|
||||||
case NUMERIC -> JSON_NODE_FACTORY.numberNode(decimal);
|
case NUMERIC -> JSON_NODE_FACTORY.numberNode(decimal);
|
||||||
case BASE64 -> JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal));
|
case BASE64 -> JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal));
|
||||||
default ->
|
|
||||||
throw new DataException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat());
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,7 +371,7 @@ public class MirrorCheckpointTask extends SourceTask {
|
||||||
offsetToSync.put(topicPartition, convertedOffset);
|
offsetToSync.put(topicPartition, convertedOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offsetToSync.size() == 0) {
|
if (offsetToSync.isEmpty()) {
|
||||||
log.trace("skip syncing the offset for consumer group: {}", consumerGroupId);
|
log.trace("skip syncing the offset for consumer group: {}", consumerGroupId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,7 +231,7 @@ public class MirrorMaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addHerder(SourceAndTarget sourceAndTarget) {
|
private void addHerder(SourceAndTarget sourceAndTarget) {
|
||||||
log.info("creating herder for " + sourceAndTarget.toString());
|
log.info("creating herder for {}", sourceAndTarget.toString());
|
||||||
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
|
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
|
||||||
String encodedSource = encodePath(sourceAndTarget.source());
|
String encodedSource = encodePath(sourceAndTarget.source());
|
||||||
String encodedTarget = encodePath(sourceAndTarget.target());
|
String encodedTarget = encodePath(sourceAndTarget.target());
|
||||||
|
|
|
@ -277,7 +277,7 @@ public final class MirrorMakerConfig extends AbstractConfig {
|
||||||
return transformed;
|
return transformed;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static ConfigDef config() {
|
private static ConfigDef config() {
|
||||||
ConfigDef result = new ConfigDef()
|
ConfigDef result = new ConfigDef()
|
||||||
.define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
|
.define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
|
||||||
.define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
|
.define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.types.Type;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
public class OffsetSync {
|
public record OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
|
||||||
public static final String TOPIC_KEY = "topic";
|
public static final String TOPIC_KEY = "topic";
|
||||||
public static final String PARTITION_KEY = "partition";
|
public static final String PARTITION_KEY = "partition";
|
||||||
public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
|
public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
|
||||||
|
@ -39,28 +39,6 @@ public class OffsetSync {
|
||||||
new Field(TOPIC_KEY, Type.STRING),
|
new Field(TOPIC_KEY, Type.STRING),
|
||||||
new Field(PARTITION_KEY, Type.INT32));
|
new Field(PARTITION_KEY, Type.INT32));
|
||||||
|
|
||||||
private final TopicPartition topicPartition;
|
|
||||||
private final long upstreamOffset;
|
|
||||||
private final long downstreamOffset;
|
|
||||||
|
|
||||||
public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
|
|
||||||
this.topicPartition = topicPartition;
|
|
||||||
this.upstreamOffset = upstreamOffset;
|
|
||||||
this.downstreamOffset = downstreamOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TopicPartition topicPartition() {
|
|
||||||
return topicPartition;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long upstreamOffset() {
|
|
||||||
return upstreamOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long downstreamOffset() {
|
|
||||||
return downstreamOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("OffsetSync{topicPartition=%s, upstreamOffset=%d, downstreamOffset=%d}",
|
return String.format("OffsetSync{topicPartition=%s, upstreamOffset=%d, downstreamOffset=%d}",
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.connect.util.Callback;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -38,8 +37,7 @@ public class CheckpointStoreTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadCheckpointsTopic() {
|
public void testReadCheckpointsTopic() {
|
||||||
Set<String> consumerGroups = new HashSet<>();
|
Set<String> consumerGroups = Set.of("group1");
|
||||||
consumerGroups.add("group1");
|
|
||||||
|
|
||||||
MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class);
|
MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class);
|
||||||
when(config.checkpointsTopic()).thenReturn("checkpoint.topic");
|
when(config.checkpointsTopic()).thenReturn("checkpoint.topic");
|
||||||
|
@ -70,8 +68,7 @@ public class CheckpointStoreTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadCheckpointsTopicError() {
|
public void testReadCheckpointsTopicError() {
|
||||||
Set<String> consumerGroups = new HashSet<>();
|
Set<String> consumerGroups = Set.of("group1");
|
||||||
consumerGroups.add("group1");
|
|
||||||
|
|
||||||
MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class);
|
MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class);
|
||||||
when(config.checkpointsTopic()).thenReturn("checkpoint.topic");
|
when(config.checkpointsTopic()).thenReturn("checkpoint.topic");
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.connect.errors.RetriableException;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -146,7 +145,7 @@ public class MirrorCheckpointConnectorTest {
|
||||||
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config);
|
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config);
|
||||||
connector = spy(connector);
|
connector = spy(connector);
|
||||||
|
|
||||||
Collection<GroupListing> groups = List.of(
|
List<GroupListing> groups = List.of(
|
||||||
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
|
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
|
||||||
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
|
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
|
||||||
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
||||||
|
@ -176,7 +175,7 @@ public class MirrorCheckpointConnectorTest {
|
||||||
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config);
|
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Set.of(), config);
|
||||||
connector = spy(connector);
|
connector = spy(connector);
|
||||||
|
|
||||||
Collection<GroupListing> groups = List.of(
|
List<GroupListing> groups = List.of(
|
||||||
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
|
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
|
||||||
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
|
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
|
||||||
new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
|
new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class DedicatedMirrorIntegrationTest {
|
||||||
final String ba = b + "->" + a;
|
final String ba = b + "->" + a;
|
||||||
final String testTopicPrefix = "test-topic-";
|
final String testTopicPrefix = "test-topic-";
|
||||||
|
|
||||||
Map<String, String> mmProps = new HashMap<String, String>() {{
|
Map<String, String> mmProps = new HashMap<>() {{
|
||||||
put("dedicated.mode.enable.internal.rest", "false");
|
put("dedicated.mode.enable.internal.rest", "false");
|
||||||
put("listeners", "http://localhost:0");
|
put("listeners", "http://localhost:0");
|
||||||
// Refresh topics very frequently to quickly pick up on topics that are created
|
// Refresh topics very frequently to quickly pick up on topics that are created
|
||||||
|
@ -204,7 +204,7 @@ public class DedicatedMirrorIntegrationTest {
|
||||||
final String ab = a + "->" + b;
|
final String ab = a + "->" + b;
|
||||||
final String testTopicPrefix = "test-topic-";
|
final String testTopicPrefix = "test-topic-";
|
||||||
|
|
||||||
Map<String, String> mmProps = new HashMap<String, String>() {{
|
Map<String, String> mmProps = new HashMap<>() {{
|
||||||
put("dedicated.mode.enable.internal.rest", "false");
|
put("dedicated.mode.enable.internal.rest", "false");
|
||||||
put("listeners", "http://localhost:0");
|
put("listeners", "http://localhost:0");
|
||||||
// Refresh topics very frequently to quickly pick up on topics that are created
|
// Refresh topics very frequently to quickly pick up on topics that are created
|
||||||
|
@ -288,7 +288,7 @@ public class DedicatedMirrorIntegrationTest {
|
||||||
final String ba = b + "->" + a;
|
final String ba = b + "->" + a;
|
||||||
final String testTopicPrefix = "test-topic-";
|
final String testTopicPrefix = "test-topic-";
|
||||||
|
|
||||||
Map<String, String> mmProps = new HashMap<String, String>() {{
|
Map<String, String> mmProps = new HashMap<>() {{
|
||||||
put("dedicated.mode.enable.internal.rest", "true");
|
put("dedicated.mode.enable.internal.rest", "true");
|
||||||
put("listeners", "http://localhost:0");
|
put("listeners", "http://localhost:0");
|
||||||
// Refresh topics very frequently to quickly pick up on topics that are created
|
// Refresh topics very frequently to quickly pick up on topics that are created
|
||||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}.
|
* Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}.
|
||||||
|
@ -36,10 +36,10 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void startClusters() throws Exception {
|
public void startClusters() throws Exception {
|
||||||
replicateBackupToPrimary = false;
|
replicateBackupToPrimary = false;
|
||||||
super.startClusters(new HashMap<String, String>() {{
|
super.startClusters(Map.of(
|
||||||
put("replication.policy.class", IdentityReplicationPolicy.class.getName());
|
"replication.policy.class", IdentityReplicationPolicy.class.getName(),
|
||||||
put("topics", "test-topic-.*");
|
"topics", "test-topic-.*"
|
||||||
}});
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -152,7 +152,7 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void startClusters() throws Exception {
|
public void startClusters() throws Exception {
|
||||||
startClusters(new HashMap<String, String>() {{
|
startClusters(new HashMap<>() {{
|
||||||
put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
|
put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
|
||||||
}});
|
}});
|
||||||
}
|
}
|
||||||
|
@ -491,7 +491,7 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
produceMessages(primaryProducer, "test-topic-1");
|
produceMessages(primaryProducer, "test-topic-1");
|
||||||
String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
|
String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
|
||||||
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
|
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
|
||||||
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
|
Map<String, Object> consumerProps = new HashMap<>() {{
|
||||||
put("group.id", consumerGroupName);
|
put("group.id", consumerGroupName);
|
||||||
put("auto.offset.reset", "earliest");
|
put("auto.offset.reset", "earliest");
|
||||||
}};
|
}};
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectors
|
||||||
assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(),
|
assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(),
|
||||||
"New topic was not re-replicated to backup cluster after altering offsets.");
|
"New topic was not re-replicated to backup cluster after altering offsets.");
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings("unchecked")
|
||||||
Class<? extends Connector>[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]);
|
Class<? extends Connector>[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]);
|
||||||
stopMirrorMakerConnectors(backup, connectorsToReset);
|
stopMirrorMakerConnectors(backup, connectorsToReset);
|
||||||
// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect
|
// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect
|
||||||
|
|
|
@ -151,7 +151,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
|
||||||
additionalBackupClusterClientsConfigs.putAll(superUserConfig());
|
additionalBackupClusterClientsConfigs.putAll(superUserConfig());
|
||||||
backupWorkerProps.putAll(superUserConfig());
|
backupWorkerProps.putAll(superUserConfig());
|
||||||
|
|
||||||
HashMap<String, String> additionalConfig = new HashMap<String, String>(superUserConfig()) {{
|
Map<String, String> additionalConfig = new HashMap<>(superUserConfig()) {{
|
||||||
put(FORWARDING_ADMIN_CLASS, FakeForwardingAdminWithLocalMetadata.class.getName());
|
put(FORWARDING_ADMIN_CLASS, FakeForwardingAdminWithLocalMetadata.class.getName());
|
||||||
}};
|
}};
|
||||||
|
|
||||||
|
|
|
@ -719,19 +719,8 @@ public class WorkerSinkTaskThreadedTest {
|
||||||
private abstract static class TestSinkTask extends SinkTask {
|
private abstract static class TestSinkTask extends SinkTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ExpectOffsetCommitCommand {
|
private record ExpectOffsetCommitCommand(long expectedMessages, RuntimeException error,
|
||||||
final long expectedMessages;
|
Exception consumerCommitError, long consumerCommitDelayMs,
|
||||||
final RuntimeException error;
|
boolean invokeCallback) {
|
||||||
final Exception consumerCommitError;
|
|
||||||
final long consumerCommitDelayMs;
|
|
||||||
final boolean invokeCallback;
|
|
||||||
|
|
||||||
private ExpectOffsetCommitCommand(long expectedMessages, RuntimeException error, Exception consumerCommitError, long consumerCommitDelayMs, boolean invokeCallback) {
|
|
||||||
this.expectedMessages = expectedMessages;
|
|
||||||
this.error = error;
|
|
||||||
this.consumerCommitError = consumerCommitError;
|
|
||||||
this.consumerCommitDelayMs = consumerCommitDelayMs;
|
|
||||||
this.invokeCallback = invokeCallback;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,18 +86,7 @@ public class HasHeaderKeyTest {
|
||||||
Arrays.stream(headers).map(TestHeader::new).collect(Collectors.toList()));
|
Arrays.stream(headers).map(TestHeader::new).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestHeader implements Header {
|
private record TestHeader(String key) implements Header {
|
||||||
|
|
||||||
private final String key;
|
|
||||||
|
|
||||||
public TestHeader(String key) {
|
|
||||||
this.key = key;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String key() {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema schema() {
|
public Schema schema() {
|
||||||
|
|
Loading…
Reference in New Issue