KAFKA-19140 ConnectAssignor#performAssignment parameter can be replace to ConnectProtocolCompatibility (#19476)
CI / build (push) Waiting to run Details

The protocol type; for Connect assignors this is "eager", "compatible",
or "sessioned"

Since `ConnectAssignor` is an interface and the protocol parameter is
restricted to "eager", "compatible", or "sessioned", it aligns with the
existing ConnectProtocolCompatibility enum. Therefore, we can update the
code to use `ConnectProtocolCompatibility` directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-06-20 03:07:57 +08:00 committed by GitHub
parent 2c3ce72a05
commit 15ad3016b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 11 additions and 10 deletions

View File

@ -32,12 +32,12 @@ public interface ConnectAssignor {
* method computes an assignment of connectors and tasks among the members of the worker group. * method computes an assignment of connectors and tasks among the members of the worker group.
* *
* @param leaderId the leader of the group * @param leaderId the leader of the group
* @param protocol the protocol type; for Connect assignors this is "eager", "compatible", or "sessioned" * @param protocol the protocol type
* @param allMemberMetadata the metadata of all the active workers of the group * @param allMemberMetadata the metadata of all the active workers of the group
* @param coordinator the worker coordinator that runs this assignor * @param coordinator the worker coordinator that runs this assignor
* @return the assignment of connectors and tasks to workers * @return the assignment of connectors and tasks to workers
*/ */
Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> performAssignment(String leaderId, ConnectProtocolCompatibility protocol,
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator coordinator); WorkerCoordinator coordinator);
} }

View File

@ -52,7 +52,7 @@ public class EagerAssignor implements ConnectAssignor {
} }
@Override @Override
public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, public Map<String, ByteBuffer> performAssignment(String leaderId, ConnectProtocolCompatibility protocol,
List<JoinGroupResponseMember> allMemberMetadata, List<JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator coordinator) { WorkerCoordinator coordinator) {
log.debug("Performing task assignment"); log.debug("Performing task assignment");

View File

@ -97,7 +97,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
} }
@Override @Override
public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, public Map<String, ByteBuffer> performAssignment(String leaderId, ConnectProtocolCompatibility protocol,
List<JoinGroupResponseMember> allMemberMetadata, List<JoinGroupResponseMember> allMemberMetadata,
WorkerCoordinator coordinator) { WorkerCoordinator coordinator) {
log.debug("Performing task assignment"); log.debug("Performing task assignment");
@ -117,7 +117,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Max config offset root: {}, local snapshot config offsets root: {}", log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
maxOffset, coordinator.configSnapshot().offset()); maxOffset, coordinator.configSnapshot().offset());
short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); short protocolVersion = protocol.protocolVersion();
Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
if (leaderOffset == null) { if (leaderOffset == null) {

View File

@ -229,9 +229,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
if (skipAssignment) if (skipAssignment)
throw new IllegalStateException("Can't skip assignment because Connect does not support static membership."); throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");
return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER ConnectProtocolCompatibility protocolCompatibility = ConnectProtocolCompatibility.fromProtocol(protocol);
? eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this) return protocolCompatibility == EAGER
: incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this); ? eagerAssignor.performAssignment(leaderId, protocolCompatibility, allMemberMetadata, this)
: incrementalAssignor.performAssignment(leaderId, protocolCompatibility, allMemberMetadata, this);
} }
@Override @Override

View File

@ -1236,7 +1236,7 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState()); when(coordinator.configSnapshot()).thenReturn(configState());
Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment( Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
leader, leader,
ConnectProtocolCompatibility.COMPATIBLE.protocol(), ConnectProtocolCompatibility.COMPATIBLE,
memberMetadata, memberMetadata,
coordinator coordinator
); );
@ -1277,7 +1277,7 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState()); when(coordinator.configSnapshot()).thenReturn(configState());
Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment( Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
leader, leader,
ConnectProtocolCompatibility.SESSIONED.protocol(), ConnectProtocolCompatibility.SESSIONED,
memberMetadata, memberMetadata,
coordinator coordinator
); );