MINOR: Code Cleanup - Connect Module (#16066)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2024-05-31 02:25:00 +05:30 committed by GitHub
parent 33a292e4dd
commit e974914ca5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 186 additions and 190 deletions

View File

@ -88,7 +88,7 @@ public class SchemaBuilder implements Schema {
@Override @Override
public boolean isOptional() { public boolean isOptional() {
return optional == null ? false : optional; return optional != null && optional;
} }
/** /**

View File

@ -46,7 +46,7 @@ public class ConnectorReconfigurationTest {
private static class TestConnector extends Connector { private static class TestConnector extends Connector {
private boolean stopException; private final boolean stopException;
private int order = 0; private int order = 0;
public int stopOrder = -1; public int stopOrder = -1;
public int configureOrder = -1; public int configureOrder = -1;

View File

@ -82,7 +82,7 @@ public class SchemaProjectorTest {
expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.)); expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.)); expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2)); expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
expectedProjected.put(values[5], Arrays.asList(1.2345)); expectedProjected.put(values[5], Collections.singletonList(1.2345));
Object promoted; Object promoted;
for (int i = 0; i < promotableSchemas.length; ++i) { for (int i = 0; i < promotableSchemas.length; ++i) {

View File

@ -33,7 +33,7 @@ public class ConnectorUtilsTest {
public void testGroupPartitions() { public void testGroupPartitions() {
List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1); List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped); assertEquals(Collections.singletonList(FIVE_ELEMENTS), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped); assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);
@ -41,21 +41,21 @@ public class ConnectorUtilsTest {
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
assertEquals(Arrays.asList(Arrays.asList(1, 2), assertEquals(Arrays.asList(Arrays.asList(1, 2),
Arrays.asList(3, 4), Arrays.asList(3, 4),
Arrays.asList(5)), grouped); Collections.singletonList(5)), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
assertEquals(Arrays.asList(Arrays.asList(1), assertEquals(Arrays.asList(Collections.singletonList(1),
Arrays.asList(2), Collections.singletonList(2),
Arrays.asList(3), Collections.singletonList(3),
Arrays.asList(4), Collections.singletonList(4),
Arrays.asList(5)), grouped); Collections.singletonList(5)), grouped);
grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7); grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
assertEquals(Arrays.asList(Arrays.asList(1), assertEquals(Arrays.asList(Collections.singletonList(1),
Arrays.asList(2), Collections.singletonList(2),
Arrays.asList(3), Collections.singletonList(3),
Arrays.asList(4), Collections.singletonList(4),
Arrays.asList(5), Collections.singletonList(5),
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()), grouped); Collections.emptyList()), grouped);
} }

View File

@ -32,6 +32,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -60,7 +61,7 @@ public class FileStreamSinkTaskTest {
// We do not call task.start() since it would override the output stream // We do not call task.start() since it would override the output stream
task.put(Arrays.asList( task.put(Collections.singletonList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1) new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
)); ));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
@ -85,7 +86,7 @@ public class FileStreamSinkTaskTest {
task.start(props); task.start(props);
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
task.put(Arrays.asList( task.put(Collections.singletonList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1) new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1)
)); ));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));

View File

@ -94,7 +94,7 @@ public class FileStreamSourceConnectorIntegrationTest {
// Append NUM_LINES more lines to the file // Append NUM_LINES more lines to the file
try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) {
for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
printStream.println(String.format(LINE_FORMAT, i)); printStream.printf(LINE_FORMAT + "%n", i);
} }
} }
@ -197,7 +197,7 @@ public class FileStreamSourceConnectorIntegrationTest {
try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath()))) { try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath()))) {
for (int i = 0; i < numLines; i++) { for (int i = 0; i < numLines; i++) {
printStream.println(String.format(LINE_FORMAT, i)); printStream.printf(LINE_FORMAT + "%n", i);
} }
} }

View File

@ -26,7 +26,6 @@ import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -48,7 +47,7 @@ public class MirrorRestServer extends RestServer {
@Override @Override
protected Collection<Class<?>> regularResources() { protected Collection<Class<?>> regularResources() {
return Arrays.asList( return Collections.singletonList(
InternalMirrorResource.class InternalMirrorResource.class
); );
} }

View File

@ -34,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class FakeLocalMetadataStore { public class FakeLocalMetadataStore {
private static final Logger log = LoggerFactory.getLogger(FakeLocalMetadataStore.class); private static final Logger log = LoggerFactory.getLogger(FakeLocalMetadataStore.class);
private static ConcurrentHashMap<String, ConcurrentHashMap<String, String>> allTopics = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> ALL_TOPICS = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, Vector<AclBinding>> allAcls = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, Vector<AclBinding>> ALL_ACLS = new ConcurrentHashMap<>();
/** /**
* Add topic to allTopics. * Add topic to allTopics.
@ -44,7 +44,7 @@ public class FakeLocalMetadataStore {
public static void addTopicToLocalMetadataStore(NewTopic newTopic) { public static void addTopicToLocalMetadataStore(NewTopic newTopic) {
ConcurrentHashMap<String, String> configs = new ConcurrentHashMap<>(newTopic.configs()); ConcurrentHashMap<String, String> configs = new ConcurrentHashMap<>(newTopic.configs());
configs.putIfAbsent("partitions", String.valueOf(newTopic.numPartitions())); configs.putIfAbsent("partitions", String.valueOf(newTopic.numPartitions()));
allTopics.putIfAbsent(newTopic.name(), configs); ALL_TOPICS.putIfAbsent(newTopic.name(), configs);
} }
/** /**
@ -53,9 +53,9 @@ public class FakeLocalMetadataStore {
* @param newPartitionCount new partition count. * @param newPartitionCount new partition count.
*/ */
public static void updatePartitionCount(String topic, int newPartitionCount) { public static void updatePartitionCount(String topic, int newPartitionCount) {
ConcurrentHashMap<String, String> configs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); ConcurrentHashMap<String, String> configs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
configs.compute("partitions", (key, value) -> String.valueOf(newPartitionCount)); configs.compute("partitions", (key, value) -> String.valueOf(newPartitionCount));
FakeLocalMetadataStore.allTopics.putIfAbsent(topic, configs); FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, configs);
} }
/** /**
@ -64,7 +64,7 @@ public class FakeLocalMetadataStore {
* @param newConfig topic config * @param newConfig topic config
*/ */
public static void updateTopicConfig(String topic, Config newConfig) { public static void updateTopicConfig(String topic, Config newConfig) {
ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
newConfig.entries().stream().forEach(configEntry -> { newConfig.entries().stream().forEach(configEntry -> {
if (configEntry.name() != null) { if (configEntry.name() != null) {
if (configEntry.value() != null) { if (configEntry.value() != null) {
@ -75,7 +75,7 @@ public class FakeLocalMetadataStore {
} }
} }
}); });
FakeLocalMetadataStore.allTopics.putIfAbsent(topic, topicConfigs); FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, topicConfigs);
} }
/** /**
@ -84,7 +84,7 @@ public class FakeLocalMetadataStore {
* @return true if topic name is a key in allTopics * @return true if topic name is a key in allTopics
*/ */
public static Boolean containsTopic(String topic) { public static Boolean containsTopic(String topic) {
return allTopics.containsKey(topic); return ALL_TOPICS.containsKey(topic);
} }
/** /**
@ -93,7 +93,7 @@ public class FakeLocalMetadataStore {
* @return topic configurations. * @return topic configurations.
*/ */
public static Map<String, String> topicConfig(String topic) { public static Map<String, String> topicConfig(String topic) {
return allTopics.getOrDefault(topic, new ConcurrentHashMap<>()); return ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
} }
/** /**
@ -102,7 +102,7 @@ public class FakeLocalMetadataStore {
* @return {@link List<AclBinding>} * @return {@link List<AclBinding>}
*/ */
public static List<AclBinding> aclBindings(String aclPrinciple) { public static List<AclBinding> aclBindings(String aclPrinciple) {
return FakeLocalMetadataStore.allAcls.getOrDefault("User:" + aclPrinciple, new Vector<>()); return FakeLocalMetadataStore.ALL_ACLS.getOrDefault("User:" + aclPrinciple, new Vector<>());
} }
/** /**
@ -111,16 +111,16 @@ public class FakeLocalMetadataStore {
* @param aclBinding {@link AclBinding} * @param aclBinding {@link AclBinding}
*/ */
public static void addACLs(String principal, AclBinding aclBinding) { public static void addACLs(String principal, AclBinding aclBinding) {
Vector<AclBinding> aclBindings = FakeLocalMetadataStore.allAcls.getOrDefault(principal, new Vector<>()); Vector<AclBinding> aclBindings = FakeLocalMetadataStore.ALL_ACLS.getOrDefault(principal, new Vector<>());
aclBindings.add(aclBinding); aclBindings.add(aclBinding);
FakeLocalMetadataStore.allAcls.putIfAbsent(principal, aclBindings); FakeLocalMetadataStore.ALL_ACLS.putIfAbsent(principal, aclBindings);
} }
/** /**
* clear allTopics and allAcls. * clear allTopics and allAcls.
*/ */
public static void clear() { public static void clear() {
allTopics.clear(); ALL_TOPICS.clear();
allAcls.clear(); ALL_ACLS.clear();
} }
} }

View File

@ -164,7 +164,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
startClusters(additionalConfig); startClusters(additionalConfig);
try (Admin adminClient = primary.kafka().createAdminClient()) { try (Admin adminClient = primary.kafka().createAdminClient()) {
adminClient.createAcls(Arrays.asList( adminClient.createAcls(Collections.singletonList(
new AclBinding( new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
@ -172,7 +172,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
)).all().get(); )).all().get();
} }
try (Admin adminClient = backup.kafka().createAdminClient()) { try (Admin adminClient = backup.kafka().createAdminClient()) {
adminClient.createAcls(Arrays.asList( adminClient.createAcls(Collections.singletonList(
new AclBinding( new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
@ -293,7 +293,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception { public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
mm2Props.put("sync.topic.acls.enabled", "true"); mm2Props.put("sync.topic.acls.enabled", "true");
mm2Config = new MirrorMakerConfig(mm2Props); mm2Config = new MirrorMakerConfig(mm2Props);
List<AclBinding> aclBindings = Arrays.asList( List<AclBinding> aclBindings = Collections.singletonList(
new AclBinding( new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL), new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL),
new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW) new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)

View File

@ -324,7 +324,7 @@ public class ConnectorConfig extends AbstractConfig {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix)); predicate.configure(originalsWithPrefix(predicatePrefix));
transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));
} else { } else {
transformations.add(new TransformationStage<>(transformation)); transformations.add(new TransformationStage<>(transformation));
} }

View File

@ -27,6 +27,7 @@ import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Map; import java.util.Map;
public class ConnectRestServer extends RestServer { public class ConnectRestServer extends RestServer {
@ -56,7 +57,7 @@ public class ConnectRestServer extends RestServer {
@Override @Override
protected Collection<Class<?>> adminResources() { protected Collection<Class<?>> adminResources() {
return Arrays.asList( return Collections.singletonList(
LoggingResource.class LoggingResource.class
); );
} }

View File

@ -85,17 +85,15 @@ public class ConfigInfos {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); return "[" +
sb.append("[") name +
.append(name) "," +
.append(",") errorCount +
.append(errorCount) "," +
.append(",") groups +
.append(groups) "," +
.append(",") configs +
.append(configs) "]";
.append("]");
return sb.toString();
} }
} }

View File

@ -141,30 +141,28 @@ public class ConfigKeyInfo {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); return "[" +
sb.append("[") name +
.append(name) "," +
.append(",") type +
.append(type) "," +
.append(",") required +
.append(required) "," +
.append(",") defaultValue +
.append(defaultValue) "," +
.append(",") importance +
.append(importance) "," +
.append(",") documentation +
.append(documentation) "," +
.append(",") group +
.append(group) "," +
.append(",") orderInGroup +
.append(orderInGroup) "," +
.append(",") width +
.append(width) "," +
.append(",") displayName +
.append(displayName) "," +
.append(",") dependents +
.append(dependents) "]";
.append("]");
return sb.toString();
} }
} }

View File

@ -87,19 +87,17 @@ public class ConfigValueInfo {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); return "[" +
sb.append("[") name +
.append(name) "," +
.append(",") value +
.append(value) "," +
.append(",") recommendedValues +
.append(recommendedValues) "," +
.append(",") errors +
.append(errors) "," +
.append(",") visible +
.append(visible) "]";
.append("]");
return sb.toString();
} }
} }

View File

@ -199,8 +199,8 @@ public class ConnectorTopicsIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS, connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(SINK_CONNECTOR, NUM_TASKS,
"Connector tasks did not start in time."); "Connector tasks did not start in time.");
connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Arrays.asList(FOO_TOPIC), connect.assertions().assertConnectorActiveTopics(SINK_CONNECTOR, Collections.singletonList(FOO_TOPIC),
"Active topic set is not: " + Arrays.asList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR); "Active topic set is not: " + Collections.singletonList(FOO_TOPIC) + " for connector: " + SINK_CONNECTOR);
// deleting a connector resets its active topics // deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR); connect.deleteConnector(FOO_CONNECTOR);

View File

@ -48,7 +48,7 @@ public class TaskHandle {
private CountDownLatch recordsRemainingLatch; private CountDownLatch recordsRemainingLatch;
private CountDownLatch recordsToCommitLatch; private CountDownLatch recordsToCommitLatch;
private int expectedRecords = -1; private int expectedRecords = -1;
private int expectedCommits = -1; private final int expectedCommits = -1;
public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer<SinkRecord> consumer) { public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer<SinkRecord> consumer) {
this.taskId = taskId; this.taskId = taskId;

View File

@ -134,8 +134,8 @@ public class ErrorHandlingTaskTest {
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private TargetState initialState = TargetState.STARTED; private final TargetState initialState = TargetState.STARTED;
private Time time; private Time time;
private MockConnectMetrics metrics; private MockConnectMetrics metrics;
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -179,7 +179,7 @@ public class ErrorHandlingTaskTest {
private ErrorHandlingMetrics errorHandlingMetrics; private ErrorHandlingMetrics errorHandlingMetrics;
private boolean enableTopicCreation; private final boolean enableTopicCreation;
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Boolean> parameters() { public static Collection<Boolean> parameters() {

View File

@ -150,7 +150,7 @@ public class MockConnectMetrics extends ConnectMetrics {
} }
public static class MockMetricsReporter implements MetricsReporter { public static class MockMetricsReporter implements MetricsReporter {
private Map<MetricName, KafkaMetric> metricsByName = new HashMap<>(); private final Map<MetricName, KafkaMetric> metricsByName = new HashMap<>();
private MetricsContext metricsContext; private MetricsContext metricsContext;

View File

@ -141,14 +141,14 @@ public class WorkerSinkTaskTest {
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1); private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
private TargetState initialState = TargetState.STARTED; private final TargetState initialState = TargetState.STARTED;
private MockTime time; private MockTime time;
private WorkerSinkTask workerTask; private WorkerSinkTask workerTask;
@Mock @Mock
private SinkTask sinkTask; private SinkTask sinkTask;
private ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class); private final ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class);
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
private MockConnectMetrics metrics; private MockConnectMetrics metrics;
@Mock @Mock
@ -169,7 +169,7 @@ public class WorkerSinkTaskTest {
private KafkaConsumer<byte[], byte[]> consumer; private KafkaConsumer<byte[], byte[]> consumer;
@Mock @Mock
private ErrorHandlingMetrics errorHandlingMetrics; private ErrorHandlingMetrics errorHandlingMetrics;
private ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); private final ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
@Rule @Rule
public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@ -684,9 +684,9 @@ public class WorkerSinkTaskTest {
when(consumer.assignment()) when(consumer.assignment())
.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) .thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2)))
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2)))
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) .thenReturn(new HashSet<>(Collections.singletonList(TOPIC_PARTITION2)))
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)))
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)))
.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)));
@ -1788,7 +1788,7 @@ public class WorkerSinkTaskTest {
} }
private void verifyInitializeTask() { private void verifyInitializeTask() {
verify(consumer).subscribe(eq(asList(TOPIC)), rebalanceListener.capture()); verify(consumer).subscribe(eq(Collections.singletonList(TOPIC)), rebalanceListener.capture());
verify(sinkTask).initialize(sinkTaskContext.capture()); verify(sinkTask).initialize(sinkTaskContext.capture());
verify(sinkTask).start(TASK_PROPS); verify(sinkTask).start(TASK_PROPS);
} }

View File

@ -2602,7 +2602,7 @@ public class WorkerTest {
Map<String, String> taskConfig = new HashMap<>(); Map<String, String> taskConfig = new HashMap<>();
// No warnings or exceptions when a connector generates an empty list of task configs // No warnings or exceptions when a connector generates an empty list of task configs
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList()); when(sourceConnector.taskConfigs(1)).thenReturn(Collections.emptyList());
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
connectorProps.put(TASKS_MAX_CONFIG, "1"); connectorProps.put(TASKS_MAX_CONFIG, "1");
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
@ -2611,7 +2611,7 @@ public class WorkerTest {
} }
// No warnings or exceptions when a connector generates the maximum permitted number of task configs // No warnings or exceptions when a connector generates the maximum permitted number of task configs
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); when(sourceConnector.taskConfigs(1)).thenReturn(Collections.singletonList(taskConfig));
when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, taskConfig)); when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, taskConfig));
when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, taskConfig, taskConfig)); when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, taskConfig, taskConfig));
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
@ -2672,7 +2672,7 @@ public class WorkerTest {
} }
// One last sanity check in case the connector is reconfigured and respects tasks.max // One last sanity check in case the connector is reconfigured and respects tasks.max
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig)); when(sourceConnector.taskConfigs(1)).thenReturn(Collections.singletonList(taskConfig));
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
connectorProps.put(TASKS_MAX_CONFIG, "1"); connectorProps.put(TASKS_MAX_CONFIG, "1");
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps)); List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));

View File

@ -98,7 +98,7 @@ public class ConnectProtocolCompatibilityTest {
public void testEagerToEagerAssignment() { public void testEagerToEagerAssignment() {
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0)); Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0));
ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment); ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment);
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf); ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
@ -110,7 +110,7 @@ public class ConnectProtocolCompatibilityTest {
ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment( ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment(
ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0));
ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf); ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf);
@ -125,7 +125,7 @@ public class ConnectProtocolCompatibilityTest {
public void testCoopToCoopAssignment() { public void testCoopToCoopAssignment() {
ExtendedAssignment assignment = new ExtendedAssignment( ExtendedAssignment assignment = new ExtendedAssignment(
CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0), Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0); Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false); ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
@ -138,7 +138,7 @@ public class ConnectProtocolCompatibilityTest {
ExtendedAssignment assignment2 = new ExtendedAssignment( ExtendedAssignment assignment2 = new ExtendedAssignment(
CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
Collections.emptyList(), Collections.emptyList(), 0); Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
@ -155,7 +155,7 @@ public class ConnectProtocolCompatibilityTest {
public void testEagerToCoopAssignment() { public void testEagerToCoopAssignment() {
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0)); Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0));
ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment); ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment);
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.Assignment leaderAssignment =
@ -168,7 +168,7 @@ public class ConnectProtocolCompatibilityTest {
ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment( ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment(
ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0)); Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0));
ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2); ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.Assignment memberAssignment =
@ -184,7 +184,7 @@ public class ConnectProtocolCompatibilityTest {
public void testCoopToEagerAssignment() { public void testCoopToEagerAssignment() {
ExtendedAssignment assignment = new ExtendedAssignment( ExtendedAssignment assignment = new ExtendedAssignment(
CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L, CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0), Arrays.asList(connectorId1, connectorId3), Collections.singletonList(taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0); Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false); ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
@ -197,7 +197,7 @@ public class ConnectProtocolCompatibilityTest {
ExtendedAssignment assignment2 = new ExtendedAssignment( ExtendedAssignment assignment2 = new ExtendedAssignment(
CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L, CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0), Collections.singletonList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
Collections.emptyList(), Collections.emptyList(), 0); Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2, false); ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2, false);

View File

@ -331,7 +331,7 @@ public class DistributedHerderTest {
// Join group and get assignment // Join group and get assignment
when(member.memberId()).thenReturn("member"); when(member.memberId()).thenReturn("member");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectRebalance(1, singletonList(CONN1), singletonList(TASK1));
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -355,7 +355,7 @@ public class DistributedHerderTest {
// Join group and get assignment // Join group and get assignment
when(member.memberId()).thenReturn("member"); when(member.memberId()).thenReturn("member");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectRebalance(1, singletonList(CONN1), singletonList(TASK1));
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -380,8 +380,8 @@ public class DistributedHerderTest {
verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED)); verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED));
// Rebalance and get a new assignment // Rebalance and get a new assignment
expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR, expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
1, Arrays.asList(CONN1), Arrays.asList()); 1, singletonList(CONN1), Collections.emptyList());
herder.tick(); herder.tick();
time.sleep(3000L); time.sleep(3000L);
assertStatistics(3, 2, 100, 3000); assertStatistics(3, 2, 100, 3000);
@ -414,7 +414,7 @@ public class DistributedHerderTest {
// The new member got its assignment // The new member got its assignment
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, ConnectProtocol.Assignment.NO_ERROR,
1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); 1, singletonList(CONN1), singletonList(TASK1), 0);
// and the new assignment started // and the new assignment started
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -445,7 +445,7 @@ public class DistributedHerderTest {
// Join group. First rebalance contains revocations because a new member joined. // Join group. First rebalance contains revocations because a new member joined.
when(member.memberId()).thenReturn("member"); when(member.memberId()).thenReturn("member");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1);
expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), expectRebalance(singletonList(CONN1), singletonList(TASK1),
ConnectProtocol.Assignment.NO_ERROR, 1, ConnectProtocol.Assignment.NO_ERROR, 1,
Collections.emptyList(), Collections.emptyList(), 0); Collections.emptyList(), Collections.emptyList(), 0);
doNothing().when(member).requestRejoin(); doNothing().when(member).requestRejoin();
@ -482,7 +482,7 @@ public class DistributedHerderTest {
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1);
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, ConnectProtocol.Assignment.NO_ERROR, 1,
Collections.emptyList(), Arrays.asList(TASK2), Collections.emptyList(), singletonList(TASK2),
rebalanceDelay); rebalanceDelay);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
@ -503,7 +503,7 @@ public class DistributedHerderTest {
// The member got its assignment and revocation // The member got its assignment and revocation
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, ConnectProtocol.Assignment.NO_ERROR,
1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0); 1, singletonList(CONN1), singletonList(TASK1), 0);
// and the new assignment started // and the new assignment started
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -528,7 +528,7 @@ public class DistributedHerderTest {
// Join group and get assignment // Join group and get assignment
when(member.memberId()).thenReturn("member"); when(member.memberId()).thenReturn("member");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); expectRebalance(1, singletonList(CONN1), singletonList(TASK1));
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -550,8 +550,8 @@ public class DistributedHerderTest {
verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED)); verify(worker).startSourceTask(eq(TASK1), any(), any(), any(), eq(herder), eq(TargetState.STARTED));
// Rebalance and get a new assignment // Rebalance and get a new assignment
expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR, expectRebalance(singletonList(CONN1), singletonList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
1, Arrays.asList(CONN1), Arrays.asList()); 1, singletonList(CONN1), Collections.emptyList());
// worker is not running, so we should see no call to connectorTaskConfigs() // worker is not running, so we should see no call to connectorTaskConfigs()
expectExecuteTaskReconfiguration(false, null, null); expectExecuteTaskReconfiguration(false, null, null);
@ -606,7 +606,7 @@ public class DistributedHerderTest {
// Perform a partial re-balance just prior to the revocation // Perform a partial re-balance just prior to the revocation
// bump the configOffset to trigger reading the config topic to the end // bump the configOffset to trigger reading the config topic to the end
configOffset++; configOffset++;
expectRebalance(configOffset, Arrays.asList(), Arrays.asList()); expectRebalance(configOffset, Collections.emptyList(), Collections.emptyList());
// give it the wrong snapshot, as if we're out of sync/can't reach the broker // give it the wrong snapshot, as if we're out of sync/can't reach the broker
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
doNothing().when(member).requestRejoin(); doNothing().when(member).requestRejoin();
@ -616,9 +616,9 @@ public class DistributedHerderTest {
} }
// Revoke the connector in the next rebalance // Revoke the connector in the next rebalance
expectRebalance(Arrays.asList(CONN1), Arrays.asList(), expectRebalance(singletonList(CONN1), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, configOffset, Arrays.asList(), ConnectProtocol.Assignment.NO_ERROR, configOffset, Collections.emptyList(),
Arrays.asList()); Collections.emptyList());
if (incompleteRebalance) { if (incompleteRebalance) {
// Same as SNAPSHOT, except with an updated offset // Same as SNAPSHOT, except with an updated offset
@ -643,7 +643,7 @@ public class DistributedHerderTest {
herder.tick(); herder.tick();
// re-assign the connector back to the same worker to ensure state was cleaned up // re-assign the connector back to the same worker to ensure state was cleaned up
expectRebalance(configOffset, Arrays.asList(CONN1), Arrays.asList()); expectRebalance(configOffset, singletonList(CONN1), Collections.emptyList());
herder.tick(); herder.tick();
@ -973,7 +973,7 @@ public class DistributedHerderTest {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// Start with one connector // Start with one connector
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -1006,7 +1006,7 @@ public class DistributedHerderTest {
doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(FOO_TOPIC)); doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(FOO_TOPIC));
doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(BAR_TOPIC)); doNothing().when(statusBackingStore).deleteTopic(eq(CONN1), eq(BAR_TOPIC));
expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), expectRebalance(singletonList(CONN1), singletonList(TASK1),
ConnectProtocol.Assignment.NO_ERROR, 2, "leader", "leaderUrl", ConnectProtocol.Assignment.NO_ERROR, 2, "leader", "leaderUrl",
Collections.emptyList(), Collections.emptyList(), 0, true); Collections.emptyList(), Collections.emptyList(), 0, true);
expectConfigRefreshAndSnapshot(ClusterConfigState.EMPTY); expectConfigRefreshAndSnapshot(ClusterConfigState.EMPTY);
@ -1533,7 +1533,7 @@ public class DistributedHerderTest {
// Performs rebalance and gets new assignment // Performs rebalance and gets new assignment
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.emptyList()); ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList());
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -1556,7 +1556,7 @@ public class DistributedHerderTest {
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// join // join
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectRebalance(1, singletonList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -1591,7 +1591,7 @@ public class DistributedHerderTest {
WorkerConfigTransformer configTransformer = mock(WorkerConfigTransformer.class); WorkerConfigTransformer configTransformer = mock(WorkerConfigTransformer.class);
// join // join
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectRebalance(1, singletonList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -1646,7 +1646,7 @@ public class DistributedHerderTest {
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// join // join
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectRebalance(1, singletonList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -1683,7 +1683,7 @@ public class DistributedHerderTest {
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// start with the connector paused // start with the connector paused
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectRebalance(1, singletonList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1); expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
expectMemberPoll(); expectMemberPoll();
@ -1723,7 +1723,7 @@ public class DistributedHerderTest {
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// join // join
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); expectRebalance(1, singletonList(CONN1), Collections.emptyList());
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -1976,7 +1976,7 @@ public class DistributedHerderTest {
// Performs rebalance and gets new assignment // Performs rebalance and gets new assignment
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(),
Arrays.asList(TASK0)); singletonList(TASK0));
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true);
@ -2014,7 +2014,7 @@ public class DistributedHerderTest {
before = time.milliseconds(); before = time.milliseconds();
// After backoff, restart the process and this time succeed // After backoff, restart the process and this time succeed
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
@ -2051,7 +2051,7 @@ public class DistributedHerderTest {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -2072,7 +2072,7 @@ public class DistributedHerderTest {
// The leader gets the same assignment after a rebalance is triggered // The leader gets the same assignment after a rebalance is triggered
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, ConnectProtocol.Assignment.NO_ERROR,
1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true);
time.sleep(2000L); time.sleep(2000L);
assertStatistics(3, 1, 100, 2000); assertStatistics(3, 1, 100, 2000);
@ -2106,7 +2106,7 @@ public class DistributedHerderTest {
// After a few retries succeed to read the log to the end // After a few retries succeed to read the log to the end
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, ConnectProtocol.Assignment.NO_ERROR,
1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
before = time.milliseconds(); before = time.milliseconds();
@ -2125,7 +2125,7 @@ public class DistributedHerderTest {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V1);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1), true); expectRebalance(1, singletonList(CONN1), singletonList(TASK1), true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
expectMemberPoll(); expectMemberPoll();
@ -2146,7 +2146,7 @@ public class DistributedHerderTest {
// The leader gets the same assignment after a rebalance is triggered // The leader gets the same assignment after a rebalance is triggered
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, ConnectProtocol.Assignment.NO_ERROR, 1,
"leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 0, true); "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1), 0, true);
time.sleep(2000L); time.sleep(2000L);
assertStatistics(3, 1, 100, 2000); assertStatistics(3, 1, 100, 2000);
@ -2190,7 +2190,7 @@ public class DistributedHerderTest {
// The worker gets back the assignment that had given up // The worker gets back the assignment that had given up
expectRebalance(Collections.emptyList(), Collections.emptyList(), expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, ConnectProtocol.Assignment.NO_ERROR,
1, "leader", "leaderUrl", Arrays.asList(CONN1), Arrays.asList(TASK1), 1, "leader", "leaderUrl", singletonList(CONN1), singletonList(TASK1),
0, true); 0, true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
@ -2267,7 +2267,7 @@ public class DistributedHerderTest {
@Test @Test
public void testPutConnectorConfig() throws Exception { public void testPutConnectorConfig() throws Exception {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
@ -2388,7 +2388,7 @@ public class DistributedHerderTest {
// Patch the connector config. // Patch the connector config.
expectMemberEnsureActive(); expectMemberEnsureActive();
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), false);
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>(); FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback); herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
@ -2401,7 +2401,7 @@ public class DistributedHerderTest {
@Test @Test
public void testPatchConnectorConfig() throws Exception { public void testPatchConnectorConfig() throws Exception {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG); Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
@ -2440,7 +2440,7 @@ public class DistributedHerderTest {
patchedConnConfig.put("foo3", "added"); patchedConnConfig.put("foo3", "added");
expectMemberEnsureActive(); expectMemberEnsureActive();
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true);
ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class); ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -2567,7 +2567,7 @@ public class DistributedHerderTest {
verify(member).wakeup(); verify(member).wakeup();
verifyNoMoreInteractions(member, taskConfigCb); verifyNoMoreInteractions(member, taskConfigCb);
assertEquals( assertEquals(
Arrays.asList("awaiting startup"), singletonList("awaiting startup"),
stages stages
); );
} }
@ -2584,7 +2584,7 @@ public class DistributedHerderTest {
verify(member).wakeup(); verify(member).wakeup();
verifyNoMoreInteractions(member, taskConfigCb); verifyNoMoreInteractions(member, taskConfigCb);
assertEquals( assertEquals(
Arrays.asList("awaiting startup"), singletonList("awaiting startup"),
stages stages
); );
} }
@ -2690,7 +2690,7 @@ public class DistributedHerderTest {
verifyNoMoreInteractions(member, taskConfigCb); verifyNoMoreInteractions(member, taskConfigCb);
assertEquals( assertEquals(
Arrays.asList("awaiting startup"), singletonList("awaiting startup"),
stages stages
); );
} }
@ -3369,7 +3369,7 @@ public class DistributedHerderTest {
public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() { public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() {
when(member.memberId()).thenReturn("leader"); when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectRebalance(1, singletonList(CONN1), Collections.emptyList(), true);
expectConfigRefreshAndSnapshot(SNAPSHOT); expectConfigRefreshAndSnapshot(SNAPSHOT);
herder.startAndStopExecutor.shutdown(); herder.startAndStopExecutor.shutdown();

View File

@ -659,7 +659,7 @@ public class IncrementalCooperativeAssignorTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9")); expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9"));
expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10")); expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10"));
expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8")); expectedAssignment.get(2).connectors().add("connector8");
List<String> newConnectors = newConnectors(6, 11); List<String> newConnectors = newConnectors(6, 11);
assignor.assignConnectors(existingAssignment, newConnectors); assignor.assignConnectors(existingAssignment, newConnectors);
@ -679,11 +679,11 @@ public class IncrementalCooperativeAssignorTest {
expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9")); expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9"));
expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10")); expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10"));
expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8")); expectedAssignment.get(2).connectors().add("connector8");
expectedAssignment.get(0).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9))); expectedAssignment.get(0).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9)));
expectedAssignment.get(1).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10))); expectedAssignment.get(1).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10)));
expectedAssignment.get(2).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 8))); expectedAssignment.get(2).tasks().add(new ConnectorTaskId("task", 8));
List<String> newConnectors = newConnectors(6, 11); List<String> newConnectors = newConnectors(6, 11);
assignor.assignConnectors(existingAssignment, newConnectors); assignor.assignConnectors(existingAssignment, newConnectors);

View File

@ -75,18 +75,18 @@ public class WorkerCoordinatorIncrementalTest {
@Rule @Rule
public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
private String connectorId1 = "connector1"; private final String connectorId1 = "connector1";
private String connectorId2 = "connector2"; private final String connectorId2 = "connector2";
private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); private final ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); private final ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
private String groupId = "test-group"; private final String groupId = "test-group";
private int sessionTimeoutMs = 10; private final int sessionTimeoutMs = 10;
private int rebalanceTimeoutMs = 60; private final int rebalanceTimeoutMs = 60;
private int heartbeatIntervalMs = 2; private final int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100; private final long retryBackoffMs = 100;
private long retryBackoffMaxMs = 1000; private final long retryBackoffMaxMs = 1000;
private int requestTimeoutMs = 1000; private final int requestTimeoutMs = 1000;
private MockTime time; private MockTime time;
private MockClient client; private MockClient client;
private Node node; private Node node;

View File

@ -72,7 +72,7 @@ public class ConnectRestServerTest {
@Mock private Plugins plugins; @Mock private Plugins plugins;
private ConnectRestServer server; private ConnectRestServer server;
private CloseableHttpClient httpClient; private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses = new ArrayList<>(); private final Collection<CloseableHttpResponse> responses = new ArrayList<>();
protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar"; protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -68,7 +69,7 @@ public class RestServerConfigTest {
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999"); props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999");
config = RestServerConfig.forPublic(null, props); config = RestServerConfig.forPublic(null, props);
assertEquals(Arrays.asList("http://a.b:9999"), config.listeners()); assertEquals(Collections.singletonList("http://a.b:9999"), config.listeners());
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812"); props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
config = RestServerConfig.forPublic(null, props); config = RestServerConfig.forPublic(null, props);

View File

@ -60,13 +60,13 @@ public class FileOffsetBackingStoreTest {
private Converter converter; private Converter converter;
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>(); private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
private static final Runnable EMPTY_RUNNABLE = () -> { private static final Runnable EMPTY_RUNNABLE = () -> {
}; };
static { static {
firstSet.put(buffer("key"), buffer("value")); FIRST_SET.put(buffer("key"), buffer("value"));
firstSet.put(null, null); FIRST_SET.put(null, null);
} }
@Before @Before
@ -96,7 +96,7 @@ public class FileOffsetBackingStoreTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Callback<Void> setCallback = mock(Callback.class); Callback<Void> setCallback = mock(Callback.class);
store.set(firstSet, setCallback).get(); store.set(FIRST_SET, setCallback).get();
Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get(); Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get();
assertEquals(buffer("value"), values.get(buffer("key"))); assertEquals(buffer("value"), values.get(buffer("key")));
@ -109,7 +109,7 @@ public class FileOffsetBackingStoreTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Callback<Void> setCallback = mock(Callback.class); Callback<Void> setCallback = mock(Callback.class);
store.set(firstSet, setCallback).get(); store.set(FIRST_SET, setCallback).get();
store.stop(); store.stop();
// Restore into a new store to ensure correct reload from scratch // Restore into a new store to ensure correct reload from scratch

View File

@ -800,7 +800,7 @@ public class KafkaConfigBackingStoreMockitoTest {
// Should see a single connector and its config should be the last one seen anywhere in the log // Should see a single connector and its config should be the last one seen anywhere in the log
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
// Should see 0 tasks for that connector. // Should see 0 tasks for that connector.
@ -1053,7 +1053,7 @@ public class KafkaConfigBackingStoreMockitoTest {
// After reading the log, it should have been in an inconsistent state // After reading the log, it should have been in an inconsistent state
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(6, configState.offset()); // Should always be next to be read, not last committed assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
// Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
// Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
@ -1086,8 +1086,8 @@ public class KafkaConfigBackingStoreMockitoTest {
// This is only two more ahead of the last one because multiple calls fail, and so their configs are not written // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
// to the topic. Only the last call with 1 task config + 1 commit actually gets written. // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
assertEquals(8, configState.offset()); assertEquals(8, configState.offset());
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); assertEquals(Collections.singletonList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());

View File

@ -151,7 +151,7 @@ public class KafkaConfigBackingStoreTest {
private Converter converter; private Converter converter;
@Mock @Mock
private ConfigBackingStore.UpdateListener configUpdateListener; private ConfigBackingStore.UpdateListener configUpdateListener;
private Map<String, String> props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS); private final Map<String, String> props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
private DistributedConfig config; private DistributedConfig config;
@Mock @Mock
KafkaBasedLog<String, byte[]> storeLog; KafkaBasedLog<String, byte[]> storeLog;
@ -328,7 +328,7 @@ public class KafkaConfigBackingStoreTest {
configState = configStorage.snapshot(); configState = configStorage.snapshot();
assertEquals(3, configState.offset()); assertEquals(3, configState.offset());
String connectorName = CONNECTOR_IDS.get(0); String connectorName = CONNECTOR_IDS.get(0);
assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
@ -378,7 +378,7 @@ public class KafkaConfigBackingStoreTest {
"tasks", 1); // Starts with 2 tasks, after update has 3 "tasks", 1); // Starts with 2 tasks, after update has 3
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(2))); configUpdateListener.onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(2)));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log // Records to be read by consumer as it reads to the end of the log
@ -473,7 +473,7 @@ public class KafkaConfigBackingStoreTest {
configState = configStorage.snapshot(); configState = configStorage.snapshot();
assertEquals(1, configState.offset()); assertEquals(1, configState.offset());
String connectorName = CONNECTOR_IDS.get(0); String connectorName = CONNECTOR_IDS.get(0);
assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
assertEquals(Collections.emptyList(), configState.tasks(connectorName)); assertEquals(Collections.emptyList(), configState.tasks(connectorName));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());

View File

@ -110,7 +110,7 @@ public class KafkaOffsetBackingStoreTest {
private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW"); private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW"); private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
private Map<String, String> props = new HashMap<>(DEFAULT_PROPS); private final Map<String, String> props = new HashMap<>(DEFAULT_PROPS);
@Mock @Mock
KafkaBasedLog<byte[], byte[]> storeLog; KafkaBasedLog<byte[], byte[]> storeLog;
@Mock @Mock

View File

@ -192,9 +192,9 @@ public class ConvertingFutureCallbackTest {
} }
protected static class TestConvertingFutureCallback extends ConvertingFutureCallback<Object, Object> { protected static class TestConvertingFutureCallback extends ConvertingFutureCallback<Object, Object> {
private AtomicInteger numberOfConversions = new AtomicInteger(); private final AtomicInteger numberOfConversions = new AtomicInteger();
private CountDownLatch getInvoked = new CountDownLatch(1); private final CountDownLatch getInvoked = new CountDownLatch(1);
private CountDownLatch cancelInvoked = new CountDownLatch(1); private final CountDownLatch cancelInvoked = new CountDownLatch(1);
public int numberOfConversions() { public int numberOfConversions() {
return numberOfConversions.get(); return numberOfConversions.get();

View File

@ -26,7 +26,7 @@ public class TestFuture<T> implements Future<T> {
private volatile boolean resolved; private volatile boolean resolved;
private T result; private T result;
private Throwable exception; private Throwable exception;
private CountDownLatch getCalledLatch; private final CountDownLatch getCalledLatch;
private volatile boolean resolveOnGet; private volatile boolean resolveOnGet;
private T resolveOnGetResult; private T resolveOnGetResult;