MINOR: Cleanup Connect Module (5/n) (#20393)

This PR aims at cleaning up the`connect:runtime` module further by
getting rid of some extra code which can be replaced by record and the
relevant changes.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-09-06 07:38:56 +05:30 committed by GitHub
parent 9ba7dd68e6
commit 5e2f54e37a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 123 additions and 817 deletions

View File

@ -897,7 +897,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
for (ConfigInfos configInfos : configInfosList) {
if (configInfos != null) {
errorCount += configInfos.errorCount();
configInfoList.addAll(configInfos.values());
configInfoList.addAll(configInfos.configs());
groups.addAll(configInfos.groups());
}
}
@ -1073,7 +1073,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
StringBuilder messages = new StringBuilder();
messages.append("Connector configuration is invalid and contains the following ")
.append(errors).append(" error(s):");
for (ConfigInfo configInfo : configInfos.values()) {
for (ConfigInfo configInfo : configInfos.configs()) {
for (String msg : configInfo.configValue().errors()) {
messages.append('\n').append(msg);
}

View File

@ -16,50 +16,10 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class ConfigInfo {
private final ConfigKeyInfo configKey;
private final ConfigValueInfo configValue;
@JsonCreator
public ConfigInfo(
@JsonProperty("definition") ConfigKeyInfo configKey,
@JsonProperty("value") ConfigValueInfo configValue) {
this.configKey = configKey;
this.configValue = configValue;
}
@JsonProperty("definition")
public ConfigKeyInfo configKey() {
return configKey;
}
@JsonProperty("value")
public ConfigValueInfo configValue() {
return configValue;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConfigInfo that = (ConfigInfo) o;
return Objects.equals(configKey, that.configKey) &&
Objects.equals(configValue, that.configValue);
}
@Override
public int hashCode() {
return Objects.hash(configKey, configValue);
}
@Override
public String toString() {
return "[" + configKey + "," + configValue + "]";
}
public record ConfigInfo(
@JsonProperty("definition") ConfigKeyInfo configKey,
@JsonProperty("value") ConfigValueInfo configValue
) {
}

View File

@ -16,84 +16,14 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class ConfigInfos {
@JsonProperty("name")
private final String name;
@JsonProperty("error_count")
private final int errorCount;
@JsonProperty("groups")
private final List<String> groups;
@JsonProperty("configs")
private final List<ConfigInfo> configs;
@JsonCreator
public ConfigInfos(@JsonProperty("name") String name,
@JsonProperty("error_count") int errorCount,
@JsonProperty("groups") List<String> groups,
@JsonProperty("configs") List<ConfigInfo> configs) {
this.name = name;
this.groups = groups;
this.errorCount = errorCount;
this.configs = configs;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public List<String> groups() {
return groups;
}
@JsonProperty("error_count")
public int errorCount() {
return errorCount;
}
@JsonProperty("configs")
public List<ConfigInfo> values() {
return configs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConfigInfos that = (ConfigInfos) o;
return Objects.equals(name, that.name) &&
Objects.equals(errorCount, that.errorCount) &&
Objects.equals(groups, that.groups) &&
Objects.equals(configs, that.configs);
}
@Override
public int hashCode() {
return Objects.hash(name, errorCount, groups, configs);
}
@Override
public String toString() {
return "[" +
name +
"," +
errorCount +
"," +
groups +
"," +
configs +
"]";
}
}
public record ConfigInfos(
@JsonProperty("name") String name,
@JsonProperty("error_count") int errorCount,
@JsonProperty("groups") List<String> groups,
@JsonProperty("configs") List<ConfigInfo> configs
) {
}

View File

@ -16,153 +16,21 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class ConfigKeyInfo {
private final String name;
private final String type;
private final boolean required;
private final String defaultValue;
private final String importance;
private final String documentation;
private final String group;
private final int orderInGroup;
private final String width;
private final String displayName;
private final List<String> dependents;
@JsonCreator
public ConfigKeyInfo(@JsonProperty("name") String name,
@JsonProperty("type") String type,
@JsonProperty("required") boolean required,
@JsonProperty("default_value") String defaultValue,
@JsonProperty("importance") String importance,
@JsonProperty("documentation") String documentation,
@JsonProperty("group") String group,
@JsonProperty("order_in_group") int orderInGroup,
@JsonProperty("width") String width,
@JsonProperty("display_name") String displayName,
@JsonProperty("dependents") List<String> dependents) {
this.name = name;
this.type = type;
this.required = required;
this.defaultValue = defaultValue;
this.importance = importance;
this.documentation = documentation;
this.group = group;
this.orderInGroup = orderInGroup;
this.width = width;
this.displayName = displayName;
this.dependents = dependents;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public String type() {
return type;
}
@JsonProperty
public boolean required() {
return required;
}
@JsonProperty("default_value")
public String defaultValue() {
return defaultValue;
}
@JsonProperty
public String documentation() {
return documentation;
}
@JsonProperty
public String group() {
return group;
}
@JsonProperty("order")
public int orderInGroup() {
return orderInGroup;
}
@JsonProperty
public String width() {
return width;
}
@JsonProperty
public String importance() {
return importance;
}
@JsonProperty("display_name")
public String displayName() {
return displayName;
}
@JsonProperty
public List<String> dependents() {
return dependents;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConfigKeyInfo that = (ConfigKeyInfo) o;
return Objects.equals(name, that.name) &&
Objects.equals(type, that.type) &&
Objects.equals(required, that.required) &&
Objects.equals(defaultValue, that.defaultValue) &&
Objects.equals(importance, that.importance) &&
Objects.equals(documentation, that.documentation) &&
Objects.equals(group, that.group) &&
Objects.equals(orderInGroup, that.orderInGroup) &&
Objects.equals(width, that.width) &&
Objects.equals(displayName, that.displayName) &&
Objects.equals(dependents, that.dependents);
}
@Override
public int hashCode() {
return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
}
@Override
public String toString() {
return "[" +
name +
"," +
type +
"," +
required +
"," +
defaultValue +
"," +
importance +
"," +
documentation +
"," +
group +
"," +
orderInGroup +
"," +
width +
"," +
displayName +
"," +
dependents +
"]";
}
public record ConfigKeyInfo(
@JsonProperty("name") String name,
@JsonProperty("type") String type,
@JsonProperty("required") boolean required,
@JsonProperty("default_value") String defaultValue,
@JsonProperty("importance") String importance,
@JsonProperty("documentation") String documentation,
@JsonProperty("group") String group,
@JsonProperty("order_in_group") int orderInGroup,
@JsonProperty("width") String width,
@JsonProperty("display_name") String displayName,
@JsonProperty("dependents") List<String> dependents
) {
}

View File

@ -16,88 +16,15 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class ConfigValueInfo {
private final String name;
private final String value;
private final List<String> recommendedValues;
private final List<String> errors;
private final boolean visible;
@JsonCreator
public ConfigValueInfo(
@JsonProperty("name") String name,
@JsonProperty("value") String value,
@JsonProperty("recommended_values") List<String> recommendedValues,
@JsonProperty("errors") List<String> errors,
@JsonProperty("visible") boolean visible) {
this.name = name;
this.value = value;
this.recommendedValues = recommendedValues;
this.errors = errors;
this.visible = visible;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public String value() {
return value;
}
@JsonProperty("recommended_values")
public List<String> recommendedValues() {
return recommendedValues;
}
@JsonProperty
public List<String> errors() {
return errors;
}
@JsonProperty
public boolean visible() {
return visible;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConfigValueInfo that = (ConfigValueInfo) o;
return Objects.equals(name, that.name) &&
Objects.equals(value, that.value) &&
Objects.equals(recommendedValues, that.recommendedValues) &&
Objects.equals(errors, that.errors) &&
Objects.equals(visible, that.visible);
}
@Override
public int hashCode() {
return Objects.hash(name, value, recommendedValues, errors, visible);
}
@Override
public String toString() {
return "[" +
name +
"," +
value +
"," +
recommendedValues +
"," +
errors +
"," +
visible +
"]";
}
}
public record ConfigValueInfo(
@JsonProperty("name") String name,
@JsonProperty("value") String value,
@JsonProperty("recommended_values") List<String> recommendedValues,
@JsonProperty("errors") List<String> errors,
@JsonProperty("visible") boolean visible
) {
}

View File

@ -18,66 +18,15 @@ package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.util.ConnectorTaskId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ConnectorInfo {
private final String name;
private final Map<String, String> config;
private final List<ConnectorTaskId> tasks;
private final ConnectorType type;
@JsonCreator
public ConnectorInfo(@JsonProperty("name") String name,
@JsonProperty("config") Map<String, String> config,
@JsonProperty("tasks") List<ConnectorTaskId> tasks,
@JsonProperty("type") ConnectorType type) {
this.name = name;
this.config = config;
this.tasks = tasks;
this.type = type;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public ConnectorType type() {
return type;
}
@JsonProperty
public Map<String, String> config() {
return config;
}
@JsonProperty
public List<ConnectorTaskId> tasks() {
return tasks;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConnectorInfo that = (ConnectorInfo) o;
return Objects.equals(name, that.name) &&
Objects.equals(config, that.config) &&
Objects.equals(tasks, that.tasks) &&
Objects.equals(type, that.type);
}
@Override
public int hashCode() {
return Objects.hash(name, config, tasks, type);
}
}
public record ConnectorInfo(
@JsonProperty("name") String name,
@JsonProperty("config") Map<String, String> config,
@JsonProperty("tasks") List<ConnectorTaskId> tasks,
@JsonProperty("type") ConnectorType type
) {
}

View File

@ -16,11 +16,9 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.Objects;
/**
* Represents a single {partition, offset} pair for either a sink connector or a source connector. For source connectors,
@ -38,49 +36,15 @@ import java.util.Objects;
* }
* </pre>
*/
public class ConnectorOffset {
private final Map<String, ?> partition;
private final Map<String, ?> offset;
@JsonCreator
public ConnectorOffset(@JsonProperty("partition") Map<String, ?> partition, @JsonProperty("offset") Map<String, ?> offset) {
this.partition = partition;
this.offset = offset;
}
@JsonProperty
public Map<String, ?> partition() {
return partition;
}
@JsonProperty
public Map<String, ?> offset() {
return offset;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ConnectorOffset that)) {
return false;
}
return Objects.equals(this.partition, that.partition) &&
Objects.equals(this.offset, that.offset);
}
@Override
public int hashCode() {
return Objects.hash(partition, offset);
}
public record ConnectorOffset(
@JsonProperty("partition") Map<String, ?> partition,
@JsonProperty("offset") Map<String, ?> offset
) {
@Override
public String toString() {
return "{" +
"partition=" + partition +
", offset=" + offset +
'}';
"partition=" + partition +
", offset=" + offset +
'}';
}
}
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
@ -51,19 +50,9 @@ import java.util.Objects;
* @see ConnectorsResource#getOffsets
* @see ConnectorsResource#alterConnectorOffsets
*/
public class ConnectorOffsets {
private final List<ConnectorOffset> offsets;
@JsonCreator
public ConnectorOffsets(@JsonProperty("offsets") List<ConnectorOffset> offsets) {
this.offsets = offsets;
}
@JsonProperty
public List<ConnectorOffset> offsets() {
return offsets;
}
public record ConnectorOffsets(
@JsonProperty("offsets") List<ConnectorOffset> offsets
) {
public Map<Map<String, ?>, Map<String, ?>> toMap() {
Map<Map<String, ?>, Map<String, ?>> partitionOffsetMap = new HashMap<>();
for (ConnectorOffset offset : offsets) {
@ -72,24 +61,8 @@ public class ConnectorOffsets {
return partitionOffsetMap;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ConnectorOffsets that)) {
return false;
}
return Objects.equals(this.offsets, that.offsets);
}
@Override
public int hashCode() {
return Objects.hashCode(offsets);
}
@Override
public String toString() {
return Objects.toString(offsets);
}
}
}

View File

@ -23,43 +23,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class ConnectorStateInfo {
private final String name;
private final ConnectorState connector;
private final List<TaskState> tasks;
private final ConnectorType type;
@JsonCreator
public ConnectorStateInfo(@JsonProperty("name") String name,
@JsonProperty("connector") ConnectorState connector,
@JsonProperty("tasks") List<TaskState> tasks,
@JsonProperty("type") ConnectorType type) {
this.name = name;
this.connector = connector;
this.tasks = tasks;
this.type = type;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public ConnectorState connector() {
return connector;
}
@JsonProperty
public List<TaskState> tasks() {
return tasks;
}
@JsonProperty
public ConnectorType type() {
return type;
}
public record ConnectorStateInfo(
@JsonProperty String name,
@JsonProperty ConnectorState connector,
@JsonProperty List<TaskState> tasks,
@JsonProperty ConnectorType type
) {
public abstract static class AbstractState {
private final String state;
@ -98,7 +67,6 @@ public class ConnectorStateInfo {
}
public static class ConnectorState extends AbstractState {
@JsonCreator
public ConnectorState(@JsonProperty("state") String state,
@JsonProperty("worker_id") String worker,
@ -145,5 +113,4 @@ public class ConnectorStateInfo {
return Objects.hash(id);
}
}
}

View File

@ -23,57 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
public class CreateConnectorRequest {
private final String name;
private final Map<String, String> config;
private final InitialState initialState;
@JsonCreator
public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config,
@JsonProperty("initial_state") InitialState initialState) {
this.name = name;
this.config = config;
this.initialState = initialState;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public Map<String, String> config() {
return config;
}
@JsonProperty("initial_state")
public InitialState initialState() {
return initialState;
}
public record CreateConnectorRequest(
@JsonProperty("name") String name,
@JsonProperty("config") Map<String, String> config,
@JsonProperty("initial_state") InitialState initialState
) {
public TargetState initialTargetState() {
if (initialState != null) {
return initialState.toTargetState();
} else {
return null;
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateConnectorRequest that = (CreateConnectorRequest) o;
return Objects.equals(name, that.name) &&
Objects.equals(config, that.config) &&
Objects.equals(initialState, that.initialState);
}
@Override
public int hashCode() {
return Objects.hash(name, config, initialState);
return initialState != null ? initialState.toTargetState() : null;
}
public enum InitialState {
@ -87,16 +44,11 @@ public class CreateConnectorRequest {
}
public TargetState toTargetState() {
switch (this) {
case RUNNING:
return TargetState.STARTED;
case PAUSED:
return TargetState.PAUSED;
case STOPPED:
return TargetState.STOPPED;
default:
throw new IllegalArgumentException("Unknown initial state: " + this);
}
return switch (this) {
case RUNNING -> TargetState.STARTED;
case PAUSED -> TargetState.PAUSED;
case STOPPED -> TargetState.STOPPED;
};
}
}
}
}

View File

@ -16,47 +16,15 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Standard error format for all REST API failures. These are generated automatically by
* {@link org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper} in response to uncaught
* {@link org.apache.kafka.connect.errors.ConnectException}s.
*/
public class ErrorMessage {
private final int errorCode;
private final String message;
@JsonCreator
public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) {
this.errorCode = errorCode;
this.message = message;
}
@JsonProperty("error_code")
public int errorCode() {
return errorCode;
}
@JsonProperty
public String message() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ErrorMessage that = (ErrorMessage) o;
return Objects.equals(errorCode, that.errorCode) &&
Objects.equals(message, that.message);
}
@Override
public int hashCode() {
return Objects.hash(errorCode, message);
}
}
public record ErrorMessage(
@JsonProperty("error_code") int errorCode,
@JsonProperty String message
) {
}

View File

@ -20,49 +20,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class LoggerLevel {
private final String level;
private final Long lastModified;
public LoggerLevel(
@JsonProperty("level") String level,
@JsonProperty("last_modified") Long lastModified
) {
this.level = Objects.requireNonNull(level, "level may not be null");
this.lastModified = lastModified;
public record LoggerLevel(
@JsonProperty String level,
@JsonProperty("last_modified") Long lastModified
) {
public LoggerLevel {
Objects.requireNonNull(level, "level may not be null");
}
@JsonProperty
public String level() {
return level;
}
@JsonProperty("last_modified")
public Long lastModified() {
return lastModified;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
LoggerLevel that = (LoggerLevel) o;
return level.equals(that.level) && Objects.equals(lastModified, that.lastModified);
}
@Override
public int hashCode() {
return Objects.hash(level, lastModified);
}
@Override
public String toString() {
return "LoggerLevel{"
+ "level='" + level + '\''
+ ", lastModified=" + lastModified
+ '}';
}
}
}

View File

@ -16,11 +16,8 @@
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Standard format for regular successful REST API responses that look like:
* <pre>
@ -29,32 +26,5 @@ import java.util.Objects;
* }
* </pre>
*/
public class Message {
private final String message;
@JsonCreator
public Message(@JsonProperty("message") String message) {
this.message = message;
}
@JsonProperty
public String message() {
return message;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof Message that)) {
return false;
}
return Objects.equals(this.message, that.message);
}
@Override
public int hashCode() {
return message.hashCode();
}
public record Message(@JsonProperty String message) {
}

View File

@ -19,77 +19,23 @@ package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class PluginInfo {
private final String className;
private final PluginType type;
private final String version;
@JsonCreator
public PluginInfo(
@JsonProperty("class") String className,
@JsonProperty("type") PluginType type,
@JsonProperty("version") String version
) {
this.className = className;
this.type = type;
this.version = version;
}
public record PluginInfo(
@JsonProperty("class") String className,
@JsonProperty("type") PluginType type,
@JsonProperty("version")
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = NoVersionFilter.class)
String version
) {
public PluginInfo(PluginDesc<?> plugin) {
this(plugin.className(), plugin.type(), plugin.version());
}
@JsonProperty("class")
public String className() {
return className;
}
@JsonProperty("type")
public String type() {
return type.toString();
}
@JsonProperty("version")
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = NoVersionFilter.class)
public String version() {
return version;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PluginInfo that = (PluginInfo) o;
return Objects.equals(className, that.className) &&
Objects.equals(type, that.type) &&
Objects.equals(version, that.version);
}
@Override
public int hashCode() {
return Objects.hash(className, type, version);
}
@Override
public String toString() {
return "PluginInfo{" + "className='" + className + '\'' +
", type=" + type.toString() +
", version='" + version + '\'' +
'}';
}
public static final class NoVersionFilter {
// This method is used by Jackson to filter the version field for plugins that don't have a version
// Used by Jackson to filter out undefined versions
@Override
public boolean equals(Object obj) {
return PluginDesc.UNDEFINED_VERSION.equals(obj);
}
@ -100,4 +46,4 @@ public class PluginInfo {
return super.hashCode();
}
}
}
}

View File

@ -18,43 +18,12 @@ package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.util.ConnectorTaskId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.Objects;
public class TaskInfo {
private final ConnectorTaskId id;
private final Map<String, String> config;
@JsonCreator
public TaskInfo(@JsonProperty("id") ConnectorTaskId id, @JsonProperty("config") Map<String, String> config) {
this.id = id;
this.config = config;
}
@JsonProperty
public ConnectorTaskId id() {
return id;
}
@JsonProperty
public Map<String, String> config() {
return config;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskInfo taskInfo = (TaskInfo) o;
return Objects.equals(id, taskInfo.id) &&
Objects.equals(config, taskInfo.config);
}
@Override
public int hashCode() {
return Objects.hash(id, config);
}
}
public record TaskInfo(
@JsonProperty("id") ConnectorTaskId id,
@JsonProperty("config") Map<String, String> config
) {
}

View File

@ -143,7 +143,8 @@ public class ConnectorPluginsResource {
synchronized (this) {
if (connectorsOnly) {
return connectorPlugins.stream()
.filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type())).toList();
.filter(p -> p.type() == PluginType.SINK || p.type() == PluginType.SOURCE)
.toList();
} else {
return List.copyOf(connectorPlugins);
}

View File

@ -990,7 +990,7 @@ public class ExactlyOnceSourceIntegrationTest {
}
private ConfigInfo findConfigInfo(String property, ConfigInfos validationResult) {
return validationResult.values().stream()
return validationResult.configs().stream()
.filter(info -> property.equals(info.configKey().name()))
.findAny()
.orElseThrow(() -> new AssertionError("Failed to find configuration validation result for property '" + property + "'"));

View File

@ -487,7 +487,7 @@ public class AbstractHerderTest {
SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP), result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
Map<String, ConfigInfo> infos = result.configs().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
// Base connector config has 15 fields, connector's configs add 7
assertEquals(26, infos.size());
@ -602,7 +602,7 @@ public class AbstractHerderTest {
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
Map<String, ConfigInfo> infos = result.configs().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(33, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
@ -662,7 +662,7 @@ public class AbstractHerderTest {
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
Map<String, ConfigInfo> infos = result.configs().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(36, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
@ -726,10 +726,10 @@ public class AbstractHerderTest {
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
// Base connector config has 19 fields, connector's configs add 7, and 2 producer overrides
assertEquals(28, result.values().size());
assertTrue(result.values().stream().anyMatch(
assertEquals(28, result.configs().size());
assertTrue(result.configs().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch(
assertTrue(result.configs().stream().anyMatch(
configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
verifyValidationIsolation();
@ -770,7 +770,7 @@ public class AbstractHerderTest {
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
Map<String, String> validatedOverriddenClientConfigs = new HashMap<>();
for (ConfigInfo configInfo : result.values()) {
for (ConfigInfo configInfo : result.configs()) {
String configName = configInfo.configKey().name();
if (overriddenClientConfigs.contains(configName)) {
validatedOverriddenClientConfigs.put(configName, configInfo.configValue().value());
@ -854,7 +854,7 @@ public class AbstractHerderTest {
}
private void assertErrorForKey(ConfigInfos configInfos, String testKey) {
final List<String> errorsForKey = configInfos.values().stream()
final List<String> errorsForKey = configInfos.configs().stream()
.map(ConfigInfo::configValue)
.filter(configValue -> configValue.name().equals(testKey))
.map(ConfigValueInfo::errors)
@ -899,7 +899,7 @@ public class AbstractHerderTest {
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(values.size(), infos.configs().size());
assertEquals(0, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
@ -930,7 +930,7 @@ public class AbstractHerderTest {
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(values.size(), infos.configs().size());
assertEquals(1, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
@ -963,7 +963,7 @@ public class AbstractHerderTest {
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(values.size(), infos.configs().size());
assertEquals(2, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
@ -996,7 +996,7 @@ public class AbstractHerderTest {
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(values.size(), infos.configs().size());
assertEquals(2, infos.errorCount());
assertNoInfoKey(infos, "config.a1");
assertNoInfoKey(infos, "config.b1");
@ -1279,7 +1279,7 @@ public class AbstractHerderTest {
}
protected ConfigInfo findInfo(ConfigInfos infos, String name) {
return infos.values()
return infos.configs()
.stream()
.filter(i -> i.configValue().name().equals(name))
.findFirst()

View File

@ -167,8 +167,8 @@ public class ConnectorPluginsResourceTest {
List<ConfigValue> partialConnectorConfigValues = connectorConfigDef.validate(PARTIAL_PROPS);
ConfigInfos result = AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), connectorConfigDef.configKeys(), connectorConfigValues, List.of());
ConfigInfos partialResult = AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), connectorConfigDef.configKeys(), partialConnectorConfigValues, List.of());
List<ConfigInfo> configs = new LinkedList<>(result.values());
List<ConfigInfo> partialConfigs = new LinkedList<>(partialResult.values());
List<ConfigInfo> configs = new LinkedList<>(result.configs());
List<ConfigInfo> partialConfigs = new LinkedList<>(partialResult.configs());
ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", List.of());
ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", List.of(), List.of(), true);
@ -256,8 +256,8 @@ public class ConnectorPluginsResourceTest {
assertEquals(PARTIAL_CONFIG_INFOS.errorCount(), configInfos.errorCount());
assertEquals(PARTIAL_CONFIG_INFOS.groups(), configInfos.groups());
assertEquals(
new HashSet<>(PARTIAL_CONFIG_INFOS.values()),
new HashSet<>(configInfos.values())
new HashSet<>(PARTIAL_CONFIG_INFOS.configs()),
new HashSet<>(configInfos.configs())
);
verify(herder).validateConnectorConfig(eq(PARTIAL_PROPS), any(), anyBoolean());
}
@ -298,7 +298,7 @@ public class ConnectorPluginsResourceTest {
assertEquals(CONFIG_INFOS.name(), configInfos.name());
assertEquals(0, configInfos.errorCount());
assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
assertEquals(new HashSet<>(CONFIG_INFOS.configs()), new HashSet<>(configInfos.configs()));
verify(herder).validateConnectorConfig(eq(PROPS), any(), anyBoolean());
}
@ -338,7 +338,7 @@ public class ConnectorPluginsResourceTest {
assertEquals(CONFIG_INFOS.name(), configInfos.name());
assertEquals(0, configInfos.errorCount());
assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
assertEquals(new HashSet<>(CONFIG_INFOS.configs()), new HashSet<>(configInfos.configs()));
verify(herder).validateConnectorConfig(eq(PROPS), any(), anyBoolean());
}
@ -372,8 +372,8 @@ public class ConnectorPluginsResourceTest {
ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader();
PluginInfo sinkInfo = new PluginInfo(new PluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK, classLoader));
PluginInfo sourceInfo = new PluginInfo(new PluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE, classLoader));
assertEquals(PluginType.SINK.toString(), sinkInfo.type());
assertEquals(PluginType.SOURCE.toString(), sourceInfo.type());
assertEquals(PluginType.SINK.toString(), sinkInfo.type().toString());
assertEquals(PluginType.SOURCE.toString(), sourceInfo.type().toString());
assertEquals(SampleSinkConnector.VERSION, sinkInfo.version());
assertEquals(SampleSourceConnector.VERSION, sourceInfo.version());
assertEquals(SampleSinkConnector.class.getName(), sinkInfo.className());