mirror of https://github.com/apache/kafka.git
MINOR: Remove outdated comment in Connect's WorkerCoordinator (#9805)
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
fc68c0fc9b
commit
8bdab2e4cf
|
@ -50,9 +50,6 @@ import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompat
|
||||||
* to workers.
|
* to workers.
|
||||||
*/
|
*/
|
||||||
public class WorkerCoordinator extends AbstractCoordinator implements Closeable {
|
public class WorkerCoordinator extends AbstractCoordinator implements Closeable {
|
||||||
// Currently doesn't support multiple task assignment strategies, so we just fill in a default value
|
|
||||||
public static final String DEFAULT_SUBPROTOCOL = "default";
|
|
||||||
|
|
||||||
private final Logger log;
|
private final Logger log;
|
||||||
private final String restUrl;
|
private final String restUrl;
|
||||||
private final ConfigBackingStore configStorage;
|
private final ConfigBackingStore configStorage;
|
||||||
|
|
|
@ -59,6 +59,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
|
||||||
|
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -105,8 +107,8 @@ public class WorkerCoordinatorTest {
|
||||||
@Parameters
|
@Parameters
|
||||||
public static Iterable<?> mode() {
|
public static Iterable<?> mode() {
|
||||||
return Arrays.asList(new Object[][]{
|
return Arrays.asList(new Object[][]{
|
||||||
{ConnectProtocolCompatibility.EAGER, 1},
|
{EAGER, 1},
|
||||||
{ConnectProtocolCompatibility.COMPATIBLE, 2}});
|
{COMPATIBLE, 2}});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameter
|
@Parameter
|
||||||
|
@ -412,7 +414,7 @@ public class WorkerCoordinatorTest {
|
||||||
.setMemberId("member")
|
.setMemberId("member")
|
||||||
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
||||||
);
|
);
|
||||||
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
|
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", EAGER.protocol(), responseMembers);
|
||||||
|
|
||||||
// configState1 has 1 connector, 1 task
|
// configState1 has 1 connector, 1 task
|
||||||
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
|
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
|
||||||
|
@ -455,7 +457,7 @@ public class WorkerCoordinatorTest {
|
||||||
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
||||||
);
|
);
|
||||||
|
|
||||||
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
|
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", EAGER.protocol(), responseMembers);
|
||||||
|
|
||||||
// configState2 has 2 connector, 3 tasks and should trigger round robin assignment
|
// configState2 has 2 connector, 3 tasks and should trigger round robin assignment
|
||||||
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
|
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
|
||||||
|
@ -498,7 +500,7 @@ public class WorkerCoordinatorTest {
|
||||||
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
.setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
|
||||||
);
|
);
|
||||||
|
|
||||||
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
|
Map<String, ByteBuffer> result = coordinator.performAssignment("leader", EAGER.protocol(), responseMembers);
|
||||||
|
|
||||||
// Round robin assignment when there are the same number of connectors and tasks should result in each being
|
// Round robin assignment when there are the same number of connectors and tasks should result in each being
|
||||||
// evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks
|
// evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks
|
||||||
|
@ -535,7 +537,7 @@ public class WorkerCoordinatorTest {
|
||||||
return new JoinGroupResponse(
|
return new JoinGroupResponse(
|
||||||
new JoinGroupResponseData().setErrorCode(error.code())
|
new JoinGroupResponseData().setErrorCode(error.code())
|
||||||
.setGenerationId(generationId)
|
.setGenerationId(generationId)
|
||||||
.setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
|
.setProtocolName(EAGER.protocol())
|
||||||
.setLeader(memberId)
|
.setLeader(memberId)
|
||||||
.setMemberId(memberId)
|
.setMemberId(memberId)
|
||||||
.setMembers(metadata)
|
.setMembers(metadata)
|
||||||
|
@ -546,7 +548,7 @@ public class WorkerCoordinatorTest {
|
||||||
return new JoinGroupResponse(
|
return new JoinGroupResponse(
|
||||||
new JoinGroupResponseData().setErrorCode(error.code())
|
new JoinGroupResponseData().setErrorCode(error.code())
|
||||||
.setGenerationId(generationId)
|
.setGenerationId(generationId)
|
||||||
.setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
|
.setProtocolName(EAGER.protocol())
|
||||||
.setLeader(leaderId)
|
.setLeader(leaderId)
|
||||||
.setMemberId(memberId)
|
.setMemberId(memberId)
|
||||||
.setMembers(Collections.emptyList())
|
.setMembers(Collections.emptyList())
|
||||||
|
|
Loading…
Reference in New Issue