diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java index 1436460d1a9..d91e1fec85f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java @@ -32,12 +32,12 @@ public interface ConnectAssignor { * method computes an assignment of connectors and tasks among the members of the worker group. * * @param leaderId the leader of the group - * @param protocol the protocol type; for Connect assignors this is "eager", "compatible", or "sessioned" + * @param protocol the protocol type * @param allMemberMetadata the metadata of all the active workers of the group * @param coordinator the worker coordinator that runs this assignor * @return the assignment of connectors and tasks to workers */ - Map performAssignment(String leaderId, String protocol, + Map performAssignment(String leaderId, ConnectProtocolCompatibility protocol, List allMemberMetadata, WorkerCoordinator coordinator); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java index 0663d9e5710..2d8dba5d758 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java @@ -52,7 +52,7 @@ public class EagerAssignor implements ConnectAssignor { } @Override - public Map performAssignment(String leaderId, String protocol, + public Map performAssignment(String leaderId, ConnectProtocolCompatibility protocol, List allMemberMetadata, WorkerCoordinator coordinator) { log.debug("Performing task assignment"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 6be247cfc82..e5bd9097033 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -97,7 +97,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { } @Override - public Map performAssignment(String leaderId, String protocol, + public Map performAssignment(String leaderId, ConnectProtocolCompatibility protocol, List allMemberMetadata, WorkerCoordinator coordinator) { log.debug("Performing task assignment"); @@ -117,7 +117,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, coordinator.configSnapshot().offset()); - short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); + short protocolVersion = protocol.protocolVersion(); Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); if (leaderOffset == null) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 99daf19d1d9..871fe3b33e4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -229,9 +229,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable if (skipAssignment) throw new IllegalStateException("Can't skip assignment because Connect does not support static membership."); - return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER - ? eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this) - : incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this); + ConnectProtocolCompatibility protocolCompatibility = ConnectProtocolCompatibility.fromProtocol(protocol); + return protocolCompatibility == EAGER + ? eagerAssignor.performAssignment(leaderId, protocolCompatibility, allMemberMetadata, this) + : incrementalAssignor.performAssignment(leaderId, protocolCompatibility, allMemberMetadata, this); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 8e10a07a015..26ee2331b50 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -1236,7 +1236,7 @@ public class IncrementalCooperativeAssignorTest { when(coordinator.configSnapshot()).thenReturn(configState()); Map serializedAssignments = assignor.performAssignment( leader, - ConnectProtocolCompatibility.COMPATIBLE.protocol(), + ConnectProtocolCompatibility.COMPATIBLE, memberMetadata, coordinator ); @@ -1277,7 +1277,7 @@ public class IncrementalCooperativeAssignorTest { when(coordinator.configSnapshot()).thenReturn(configState()); Map serializedAssignments = assignor.performAssignment( leader, - ConnectProtocolCompatibility.SESSIONED.protocol(), + ConnectProtocolCompatibility.SESSIONED, memberMetadata, coordinator );