diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 33450766b42..230203796c3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -58,7 +58,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier; import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; @@ -134,8 +134,8 @@ public class GroupCoordinatorService implements GroupCoordinator { String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); - CoordinatorBuilderSupplier supplier = () -> - new ReplicatedGroupCoordinator.Builder(config); + CoordinatorShardBuilderSupplier supplier = () -> + new GroupCoordinatorShard.Builder(config); CoordinatorEventProcessor processor = new MultiThreadedEventProcessor( logContext, @@ -143,8 +143,8 @@ public class GroupCoordinatorService implements GroupCoordinator { config.numThreads ); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(time) .withTimer(timer) .withLogPrefix(logPrefix) @@ -152,7 +152,7 @@ public class GroupCoordinatorService implements GroupCoordinator { .withEventProcessor(processor) .withPartitionWriter(writer) .withLoader(loader) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .withTime(time) .build(); @@ -177,7 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator { /** * The coordinator runtime. */ - private final CoordinatorRuntime runtime; + private final CoordinatorRuntime runtime; /** * Boolean indicating whether the coordinator is active or not. @@ -199,7 +199,7 @@ public class GroupCoordinatorService implements GroupCoordinator { GroupCoordinatorService( LogContext logContext, GroupCoordinatorConfig config, - CoordinatorRuntime runtime + CoordinatorRuntime runtime ) { this.log = logContext.logger(CoordinatorLoader.class); this.config = config; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java similarity index 90% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 392af291bc5..13537db6977 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.coordinator.group; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; @@ -48,8 +47,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.runtime.Coordinator; -import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; +import org.apache.kafka.coordinator.group.runtime.CoordinatorShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; import org.apache.kafka.image.MetadataDelta; @@ -60,22 +59,21 @@ import org.apache.kafka.timeline.SnapshotRegistry; import java.util.concurrent.CompletableFuture; /** - * The group coordinator replicated state machine that manages the metadata of all generic and - * consumer groups. It holds the hard and the soft state of the groups. This class has two kinds - * of methods: + * The group coordinator shard is a replicated state machine that manages the metadata of all + * generic and consumer groups. It holds the hard and the soft state of the groups. This class + * has two kinds of methods: * 1) The request handlers which handle the requests and generate a response and records to * mutate the hard state. Those records will be written by the runtime and applied to the * hard state via the replay methods. * 2) The replay methods which apply records to the hard state. Those are used in the request * handling as well as during the initial loading of the records from the partitions. */ -public class ReplicatedGroupCoordinator implements Coordinator { +public class GroupCoordinatorShard implements CoordinatorShard { - public static class Builder implements CoordinatorBuilder { + public static class Builder implements CoordinatorShardBuilder { private final GroupCoordinatorConfig config; private LogContext logContext; private SnapshotRegistry snapshotRegistry; - private TopicPartition topicPartition; private Time time; private CoordinatorTimer timer; @@ -86,7 +84,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { } @Override - public CoordinatorBuilder withLogContext( + public CoordinatorShardBuilder withLogContext( LogContext logContext ) { this.logContext = logContext; @@ -94,7 +92,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { } @Override - public CoordinatorBuilder withTime( + public CoordinatorShardBuilder withTime( Time time ) { this.time = time; @@ -102,7 +100,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { } @Override - public CoordinatorBuilder withTimer( + public CoordinatorShardBuilder withTimer( CoordinatorTimer timer ) { this.timer = timer; @@ -110,7 +108,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { } @Override - public CoordinatorBuilder withSnapshotRegistry( + public CoordinatorShardBuilder withSnapshotRegistry( SnapshotRegistry snapshotRegistry ) { this.snapshotRegistry = snapshotRegistry; @@ -118,15 +116,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { } @Override - public CoordinatorBuilder withTopicPartition( - TopicPartition topicPartition - ) { - this.topicPartition = topicPartition; - return this; - } - - @Override - public ReplicatedGroupCoordinator build() { + public GroupCoordinatorShard build() { if (logContext == null) logContext = new LogContext(); if (config == null) throw new IllegalArgumentException("Config must be set."); @@ -136,16 +126,13 @@ public class ReplicatedGroupCoordinator implements Coordinator { throw new IllegalArgumentException("Time must be set."); if (timer == null) throw new IllegalArgumentException("Timer must be set."); - if (topicPartition == null) - throw new IllegalArgumentException("TopicPartition must be set."); GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() .withLogContext(logContext) .withSnapshotRegistry(snapshotRegistry) .withTime(time) .withTimer(timer) - .withTopicPartition(topicPartition) - .withAssignors(config.consumerGroupAssignors) + .withConsumerGroupAssignors(config.consumerGroupAssignors) .withConsumerGroupMaxSize(config.consumerGroupMaxSize) .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) .withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs) @@ -162,7 +149,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize) .build(); - return new ReplicatedGroupCoordinator( + return new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -185,7 +172,7 @@ public class ReplicatedGroupCoordinator implements Coordinator { * @param groupMetadataManager The group metadata manager. * @param offsetMetadataManager The offset metadata manager. */ - ReplicatedGroupCoordinator( + GroupCoordinatorShard( GroupMetadataManager groupMetadataManager, OffsetMetadataManager offsetMetadataManager ) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 09857750c48..ab37d760bc2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -17,7 +17,6 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; @@ -131,11 +130,10 @@ public class GroupMetadataManager { private SnapshotRegistry snapshotRegistry = null; private Time time = null; private CoordinatorTimer timer = null; - private List assignors = null; + private List consumerGroupAssignors = null; private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupHeartbeatIntervalMs = 5000; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; - private TopicPartition topicPartition = null; private MetadataImage metadataImage = null; private int consumerGroupSessionTimeoutMs = 45000; private int genericGroupMaxSize = Integer.MAX_VALUE; @@ -164,8 +162,8 @@ public class GroupMetadataManager { return this; } - Builder withAssignors(List assignors) { - this.assignors = assignors; + Builder withConsumerGroupAssignors(List consumerGroupAssignors) { + this.consumerGroupAssignors = consumerGroupAssignors; return this; } @@ -194,11 +192,6 @@ public class GroupMetadataManager { return this; } - Builder withTopicPartition(TopicPartition tp) { - this.topicPartition = tp; - return this; - } - Builder withGenericGroupMaxSize(int genericGroupMaxSize) { this.genericGroupMaxSize = genericGroupMaxSize; return this; @@ -232,20 +225,15 @@ public class GroupMetadataManager { if (timer == null) throw new IllegalArgumentException("Timer must be set."); - if (assignors == null || assignors.isEmpty()) + if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty()) throw new IllegalArgumentException("Assignors must be set before building."); - if (topicPartition == null) { - throw new IllegalStateException("TopicPartition must be set before building."); - } - return new GroupMetadataManager( - topicPartition, snapshotRegistry, logContext, time, timer, - assignors, + consumerGroupAssignors, metadataImage, consumerGroupMaxSize, consumerGroupSessionTimeoutMs, @@ -260,11 +248,6 @@ public class GroupMetadataManager { } } - /** - * The topic partition associated with the metadata manager. - */ - private final TopicPartition topicPartition; - /** * The log context. */ @@ -370,7 +353,6 @@ public class GroupMetadataManager { private final int genericGroupMaxSessionTimeoutMs; private GroupMetadataManager( - TopicPartition topicPartition, SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, @@ -394,7 +376,6 @@ public class GroupMetadataManager { this.timer = timer; this.metadataImage = metadataImage; this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); - this.topicPartition = topicPartition; this.defaultAssignor = assignors.get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0); @@ -1975,8 +1956,7 @@ public class GroupMetadataManager { } else { group.initNextGeneration(); if (group.isInState(EMPTY)) { - log.info("Group {} with generation {} is now empty ({}-{})", - groupId, group.generationId(), topicPartition.topic(), topicPartition.partition()); + log.info("Group {} with generation {} is now empty.", groupId, group.generationId()); CompletableFuture appendFuture = new CompletableFuture<>(); appendFuture.whenComplete((__, t) -> { @@ -1994,8 +1974,8 @@ public class GroupMetadataManager { return new CoordinatorResult<>(records, appendFuture); } else { - log.info("Stabilized group {} generation {} ({}) with {} members", - groupId, group.generationId(), topicPartition, group.size()); + log.info("Stabilized group {} generation {} with {} members.", + groupId, group.generationId(), group.size()); // Complete the awaiting join group response future for all the members after rebalancing group.allMembers().forEach(member -> { @@ -2274,9 +2254,8 @@ public class GroupMetadataManager { group.transitionTo(PREPARING_REBALANCE); - log.info("Preparing to rebalance group {} in state {} with old generation {} ({}-{}) (reason: {})", - group.groupId(), group.currentState(), group.generationId(), - topicPartition.topic(), topicPartition.partition(), reason); + log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).", + group.groupId(), group.currentState(), group.generationId(), reason); return isInitialRebalance ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(group); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index acf2c232d2d..9ae0d1fc42a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -72,7 +72,7 @@ import java.util.function.Consumer; * @param The type of the state machine. * @param The type of the record. */ -public class CoordinatorRuntime, U> implements AutoCloseable { +public class CoordinatorRuntime, U> implements AutoCloseable { /** * Builder to create a CoordinatorRuntime. @@ -80,13 +80,13 @@ public class CoordinatorRuntime, U> implements AutoClos * @param The type of the state machine. * @param The type of the record. */ - public static class Builder, U> { + public static class Builder, U> { private String logPrefix; private LogContext logContext; private CoordinatorEventProcessor eventProcessor; private PartitionWriter partitionWriter; private CoordinatorLoader loader; - private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + private CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier; private Time time = Time.SYSTEM; private Timer timer; @@ -115,8 +115,8 @@ public class CoordinatorRuntime, U> implements AutoClos return this; } - public Builder withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { - this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; + public Builder withCoordinatorShardBuilderSupplier(CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier) { + this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier; return this; } @@ -141,7 +141,7 @@ public class CoordinatorRuntime, U> implements AutoClos throw new IllegalArgumentException("Partition write must be set."); if (loader == null) throw new IllegalArgumentException("Loader must be set."); - if (coordinatorBuilderSupplier == null) + if (coordinatorShardBuilderSupplier == null) throw new IllegalArgumentException("State machine supplier must be set."); if (time == null) throw new IllegalArgumentException("Time must be set."); @@ -154,7 +154,7 @@ public class CoordinatorRuntime, U> implements AutoClos eventProcessor, partitionWriter, loader, - coordinatorBuilderSupplier, + coordinatorShardBuilderSupplier, time, timer ); @@ -508,13 +508,12 @@ public class CoordinatorRuntime, U> implements AutoClos snapshotRegistry = new SnapshotRegistry(logContext); lastWrittenOffset = 0L; lastCommittedOffset = 0L; - coordinator = coordinatorBuilderSupplier + coordinator = coordinatorShardBuilderSupplier .get() .withLogContext(logContext) .withSnapshotRegistry(snapshotRegistry) .withTime(time) .withTimer(timer) - .withTopicPartition(tp) .build(); break; @@ -994,7 +993,7 @@ public class CoordinatorRuntime, U> implements AutoClos * The coordinator state machine builder used by the runtime * to instantiate a coordinator. */ - private final CoordinatorBuilderSupplier coordinatorBuilderSupplier; + private final CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier; /** * Atomic boolean indicating whether the runtime is running. @@ -1009,14 +1008,14 @@ public class CoordinatorRuntime, U> implements AutoClos /** * Constructor. * - * @param logPrefix The log prefix. - * @param logContext The log context. - * @param processor The event processor. - * @param partitionWriter The partition writer. - * @param loader The coordinator loader. - * @param coordinatorBuilderSupplier The coordinator builder. - * @param time The system time. - * @param timer The system timer. + * @param logPrefix The log prefix. + * @param logContext The log context. + * @param processor The event processor. + * @param partitionWriter The partition writer. + * @param loader The coordinator loader. + * @param coordinatorShardBuilderSupplier The coordinator builder. + * @param time The system time. + * @param timer The system timer. */ private CoordinatorRuntime( String logPrefix, @@ -1024,7 +1023,7 @@ public class CoordinatorRuntime, U> implements AutoClos CoordinatorEventProcessor processor, PartitionWriter partitionWriter, CoordinatorLoader loader, - CoordinatorBuilderSupplier coordinatorBuilderSupplier, + CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, Timer timer ) { @@ -1038,7 +1037,7 @@ public class CoordinatorRuntime, U> implements AutoClos this.partitionWriter = partitionWriter; this.highWatermarklistener = new HighWatermarkListener(); this.loader = loader; - this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; + this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java similarity index 89% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java index 8189e1ab89b..1dca9835098 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java @@ -20,10 +20,10 @@ import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; /** - * Coordinator is basically a replicated state machine managed by the + * CoordinatorShard is basically a replicated state machine managed by the * {@link CoordinatorRuntime}. */ -public interface Coordinator extends CoordinatorPlayback { +public interface CoordinatorShard extends CoordinatorPlayback { /** * The coordinator has been loaded. This is used to apply any @@ -34,7 +34,7 @@ public interface Coordinator extends CoordinatorPlayback { default void onLoaded(MetadataImage newImage) {} /** - * A new metadata image is available. This is only called after {@link Coordinator#onLoaded(MetadataImage)} + * A new metadata image is available. This is only called after {@link CoordinatorShard#onLoaded(MetadataImage)} * is called to signal that the coordinator has been fully loaded. * * @param newImage The new metadata image. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java similarity index 76% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java index dae9c6d62a3..df2b514b63c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java @@ -16,19 +16,18 @@ */ package org.apache.kafka.coordinator.group.runtime; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.timeline.SnapshotRegistry; /** - * A builder to build a {@link Coordinator} replicated state machine. + * A builder to build a {@link CoordinatorShard} replicated state machine. * * @param The type of the coordinator. * @param The record type. */ -public interface CoordinatorBuilder, U> { +public interface CoordinatorShardBuilder, U> { /** * Sets the snapshot registry used to back all the timeline @@ -38,7 +37,7 @@ public interface CoordinatorBuilder, U> { * * @return The builder. */ - CoordinatorBuilder withSnapshotRegistry( + CoordinatorShardBuilder withSnapshotRegistry( SnapshotRegistry snapshotRegistry ); @@ -49,20 +48,10 @@ public interface CoordinatorBuilder, U> { * * @return The builder. */ - CoordinatorBuilder withLogContext( + CoordinatorShardBuilder withLogContext( LogContext logContext ); - /** - * Sets the topic partition. - * @param topicPartition The topic partition. - * - * @return The builder. - */ - CoordinatorBuilder withTopicPartition( - TopicPartition topicPartition - ); - /** * Sets the time. * @@ -70,7 +59,7 @@ public interface CoordinatorBuilder, U> { * * @return The builder. */ - CoordinatorBuilder withTime( + CoordinatorShardBuilder withTime( Time time ); @@ -81,7 +70,7 @@ public interface CoordinatorBuilder, U> { * * @return The builder. */ - CoordinatorBuilder withTimer( + CoordinatorShardBuilder withTimer( CoordinatorTimer timer ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilderSupplier.java similarity index 78% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilderSupplier.java index 98b7c54fca8..2c77d78aa79 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilderSupplier.java @@ -17,14 +17,14 @@ package org.apache.kafka.coordinator.group.runtime; /** - * Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}. + * Supplies a {@link CoordinatorShardBuilder} to the {@link CoordinatorRuntime}. * * @param The type of the coordinator. * @param The record type. */ -public interface CoordinatorBuilderSupplier, U> { +public interface CoordinatorShardBuilderSupplier, U> { /** - * @return A {@link CoordinatorBuilder}. + * @return A {@link CoordinatorShardBuilder}. */ - CoordinatorBuilder get(); + CoordinatorShardBuilder get(); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 36dc455f9b4..d1e3bea8c7a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -83,8 +83,8 @@ import static org.mockito.Mockito.when; public class GroupCoordinatorServiceTest { @SuppressWarnings("unchecked") - private CoordinatorRuntime mockRuntime() { - return (CoordinatorRuntime) mock(CoordinatorRuntime.class); + private CoordinatorRuntime mockRuntime() { + return (CoordinatorRuntime) mock(CoordinatorRuntime.class); } private GroupCoordinatorConfig createConfig() { @@ -106,7 +106,7 @@ public class GroupCoordinatorServiceTest { @Test public void testStartupShutdown() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -121,7 +121,7 @@ public class GroupCoordinatorServiceTest { @Test public void testConsumerGroupHeartbeatWhenNotStarted() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -142,7 +142,7 @@ public class GroupCoordinatorServiceTest { @Test public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -190,7 +190,7 @@ public class GroupCoordinatorServiceTest { short expectedErrorCode, String expectedErrorMessage ) throws ExecutionException, InterruptedException, TimeoutException { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -223,7 +223,7 @@ public class GroupCoordinatorServiceTest { @Test public void testPartitionFor() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -240,7 +240,7 @@ public class GroupCoordinatorServiceTest { @Test public void testGroupMetadataTopicConfigs() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -257,7 +257,7 @@ public class GroupCoordinatorServiceTest { @Test public void testOnElection() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -278,7 +278,7 @@ public class GroupCoordinatorServiceTest { @Test public void testOnResignation() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -299,7 +299,7 @@ public class GroupCoordinatorServiceTest { @Test public void testJoinGroup() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -330,7 +330,7 @@ public class GroupCoordinatorServiceTest { @Test public void testJoinGroupWithException() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -363,7 +363,7 @@ public class GroupCoordinatorServiceTest { @Test public void testJoinGroupInvalidGroupId() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -408,7 +408,7 @@ public class GroupCoordinatorServiceTest { @Test public void testSyncGroup() { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -439,7 +439,7 @@ public class GroupCoordinatorServiceTest { @Test public void testSyncGroupWithException() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -473,7 +473,7 @@ public class GroupCoordinatorServiceTest { @Test public void testSyncGroupInvalidGroupId() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -502,7 +502,7 @@ public class GroupCoordinatorServiceTest { @Test public void testHeartbeat() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -533,7 +533,7 @@ public class GroupCoordinatorServiceTest { @Test public void testHeartbeatCoordinatorNotAvailableException() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), @@ -564,7 +564,7 @@ public class GroupCoordinatorServiceTest { @Test public void testHeartbeatCoordinatorException() throws Exception { - CoordinatorRuntime runtime = mockRuntime(); + CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java similarity index 91% rename from group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java rename to group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 2f585d51ab6..a663147e7e3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -55,13 +55,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ReplicatedGroupCoordinatorTest { +public class GroupCoordinatorShardTest { @Test public void testConsumerGroupHeartbeat() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -85,7 +85,7 @@ public class ReplicatedGroupCoordinatorTest { public void testCommitOffset() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -109,7 +109,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayOffsetCommit() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -134,7 +134,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayOffsetCommitWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -158,7 +158,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -178,7 +178,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -197,7 +197,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupPartitionMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -217,7 +217,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupPartitionMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -236,7 +236,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupMemberMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -256,7 +256,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupMemberMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -275,7 +275,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupTargetAssignmentMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -295,7 +295,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -314,7 +314,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupTargetAssignmentMember() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -334,7 +334,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -353,7 +353,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupCurrentMemberAssignment() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -373,7 +373,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -392,7 +392,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayKeyCannotBeNull() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -404,7 +404,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayWithUnsupportedVersion() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -423,7 +423,7 @@ public class ReplicatedGroupCoordinatorTest { MetadataImage image = MetadataImage.EMPTY; GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -442,7 +442,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); @@ -462,7 +462,7 @@ public class ReplicatedGroupCoordinatorTest { public void testReplayGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); - ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( groupMetadataManager, offsetMetadataManager ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index dd00d1f7ae8..3c5c873da66 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -302,7 +302,7 @@ public class GroupMetadataManagerTest { final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); final private TopicPartition groupMetadataTopicPartition = new TopicPartition("topic", 0); private MetadataImage metadataImage; - private List assignors = Collections.singletonList(new MockPartitionAssignor("range")); + private List consumerGroupAssignors = Collections.singletonList(new MockPartitionAssignor("range")); private List consumerGroupBuilders = new ArrayList<>(); private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; @@ -318,7 +318,7 @@ public class GroupMetadataManagerTest { } public Builder withAssignors(List assignors) { - this.assignors = assignors; + this.consumerGroupAssignors = assignors; return this; } @@ -359,14 +359,13 @@ public class GroupMetadataManagerTest { public GroupMetadataManagerTestContext build() { if (metadataImage == null) metadataImage = MetadataImage.EMPTY; - if (assignors == null) assignors = Collections.emptyList(); + if (consumerGroupAssignors == null) consumerGroupAssignors = Collections.emptyList(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext( time, timer, snapshotRegistry, new GroupMetadataManager.Builder() - .withTopicPartition(groupMetadataTopicPartition) .withSnapshotRegistry(snapshotRegistry) .withLogContext(logContext) .withTime(time) @@ -375,7 +374,7 @@ public class GroupMetadataManagerTest { .withConsumerGroupHeartbeatInterval(5000) .withConsumerGroupSessionTimeout(45000) .withConsumerGroupMaxSize(consumerGroupMaxSize) - .withAssignors(assignors) + .withConsumerGroupAssignors(consumerGroupAssignors) .withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs) .withGenericGroupMaxSize(genericGroupMaxSize) .withGenericGroupMinSessionTimeoutMs(genericGroupMinSessionTimeoutMs) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 496df95e2ee..5fdfcc4c01e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -92,8 +92,7 @@ public class OffsetMetadataManagerTest { .withSnapshotRegistry(snapshotRegistry) .withLogContext(logContext) .withMetadataImage(metadataImage) - .withTopicPartition(new TopicPartition("__consumer_offsets", 0)) - .withAssignors(Collections.singletonList(new RangeAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())) .build(); OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 8a1b1511c86..a7eb0f6b709 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -166,11 +166,11 @@ public class CoordinatorRuntimeTest { /** * A simple Coordinator implementation that stores the records into a set. */ - private static class MockCoordinator implements Coordinator { + private static class MockCoordinatorShard implements CoordinatorShard { private final TimelineHashSet records; private final CoordinatorTimer timer; - MockCoordinator( + MockCoordinatorShard( SnapshotRegistry snapshotRegistry, CoordinatorTimer timer ) { @@ -195,12 +195,12 @@ public class CoordinatorRuntimeTest { /** * A CoordinatorBuilder that creates a MockCoordinator. */ - private static class MockCoordinatorBuilder implements CoordinatorBuilder { + private static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder { private SnapshotRegistry snapshotRegistry; private CoordinatorTimer timer; @Override - public CoordinatorBuilder withSnapshotRegistry( + public CoordinatorShardBuilder withSnapshotRegistry( SnapshotRegistry snapshotRegistry ) { this.snapshotRegistry = snapshotRegistry; @@ -208,36 +208,36 @@ public class CoordinatorRuntimeTest { } @Override - public CoordinatorBuilder withLogContext( + public CoordinatorShardBuilder withLogContext( LogContext logContext ) { return this; } @Override - public CoordinatorBuilder withTime( + public CoordinatorShardBuilder withTime( Time time ) { return this; } @Override - public CoordinatorBuilder withTimer( + public CoordinatorShardBuilder withTimer( CoordinatorTimer timer ) { this.timer = timer; return this; } - public CoordinatorBuilder withTopicPartition( + public CoordinatorShardBuilder withTopicPartition( TopicPartition topicPartition ) { return this; } @Override - public MockCoordinator build() { - return new MockCoordinator( + public MockCoordinatorShard build() { + return new MockCoordinatorShard( Objects.requireNonNull(this.snapshotRegistry), Objects.requireNonNull(this.timer) ); @@ -247,10 +247,10 @@ public class CoordinatorRuntimeTest { /** * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder. */ - private static class MockCoordinatorBuilderSupplier implements CoordinatorBuilderSupplier { + private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier { @Override - public CoordinatorBuilder get() { - return new MockCoordinatorBuilder(); + public CoordinatorShardBuilder get() { + return new MockCoordinatorShardBuilder(); } } @@ -259,18 +259,18 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockPartitionWriter writer = mock(MockPartitionWriter.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -291,7 +291,7 @@ public class CoordinatorRuntimeTest { runtime.scheduleLoadOperation(TP, 0); // Getting the coordinator context succeeds now. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); // The coordinator is loading. assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); @@ -323,18 +323,18 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockPartitionWriter writer = mock(MockPartitionWriter.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -351,7 +351,7 @@ public class CoordinatorRuntimeTest { runtime.scheduleLoadOperation(TP, 0); // Getting the context succeeds and the coordinator should be in loading. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(0, ctx.epoch); assertEquals(coordinator, ctx.coordinator); @@ -374,18 +374,18 @@ public class CoordinatorRuntimeTest { public void testScheduleLoadingWithStalePartitionEpoch() { MockTimer timer = new MockTimer(); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -402,7 +402,7 @@ public class CoordinatorRuntimeTest { runtime.scheduleLoadOperation(TP, 10); // Getting the context succeeds and the coordinator should be in loading. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(10, ctx.epoch); assertEquals(coordinator, ctx.coordinator); @@ -423,18 +423,18 @@ public class CoordinatorRuntimeTest { public void testScheduleLoadingAfterLoadingFailure() { MockTimer timer = new MockTimer(); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -451,7 +451,7 @@ public class CoordinatorRuntimeTest { runtime.scheduleLoadOperation(TP, 10); // Getting the context succeeds and the coordinator should be in loading. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(10, ctx.epoch); assertEquals(coordinator, ctx.coordinator); @@ -464,7 +464,7 @@ public class CoordinatorRuntimeTest { verify(coordinator, times(1)).onUnloaded(); // Create a new coordinator. - coordinator = mock(MockCoordinator.class); + coordinator = mock(MockCoordinatorShard.class); when(builder.build()).thenReturn(coordinator); // Schedule the reloading. @@ -489,18 +489,18 @@ public class CoordinatorRuntimeTest { public void testScheduleUnloading() { MockTimer timer = new MockTimer(); MockPartitionWriter writer = mock(MockPartitionWriter.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -513,7 +513,7 @@ public class CoordinatorRuntimeTest { // Loads the coordinator. It directly transitions to active. runtime.scheduleLoadOperation(TP, 10); - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); assertEquals(10, ctx.epoch); @@ -537,18 +537,18 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleUnloadingWithStalePartitionEpoch() { MockTimer timer = new MockTimer(); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); - MockCoordinator coordinator = mock(MockCoordinator.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -563,7 +563,7 @@ public class CoordinatorRuntimeTest { // Loads the coordinator. It directly transitions to active. runtime.scheduleLoadOperation(TP, 10); - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); assertEquals(10, ctx.epoch); @@ -579,21 +579,21 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockPartitionWriter writer = new MockPartitionWriter(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); // Verify the initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.lastWrittenOffset); assertEquals(0L, ctx.lastCommittedOffset); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); @@ -687,14 +687,14 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleWriteOpWhenInactive() { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Scheduling a write fails with a NotCoordinatorException because the coordinator @@ -707,14 +707,14 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleWriteOpWhenOpFails() { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. @@ -731,28 +731,28 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleWriteOpWhenReplayFails() { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Verify the initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.lastWrittenOffset); assertEquals(0L, ctx.lastCommittedOffset); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); // Override the coordinator with a coordinator that throws // an exception when replay is called. - ctx.coordinator = new MockCoordinator(ctx.snapshotRegistry, ctx.timer) { + ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) { @Override public void replay(String record) throws RuntimeException { throw new IllegalArgumentException("error"); @@ -776,21 +776,21 @@ public class CoordinatorRuntimeTest { // The partition writer only accept on write. MockPartitionWriter writer = new MockPartitionWriter(2); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Verify the initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastCommittedOffset); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); @@ -823,21 +823,21 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockPartitionWriter writer = new MockPartitionWriter(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Verify the initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastCommittedOffset); @@ -877,14 +877,14 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleReadOpWhenPartitionInactive() { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Schedule a read. It fails because the coordinator does not exist. @@ -898,21 +898,21 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockPartitionWriter writer = new MockPartitionWriter(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Verify the initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastCommittedOffset); @@ -939,21 +939,21 @@ public class CoordinatorRuntimeTest { public void testClose() throws Exception { MockCoordinatorLoader loader = spy(new MockCoordinatorLoader()); MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastCommittedOffset); @@ -1001,21 +1001,21 @@ public class CoordinatorRuntimeTest { MockTimer timer = new MockTimer(); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockPartitionWriter writer = mock(MockPartitionWriter.class); - MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); - MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(loader) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) - .withCoordinatorBuilderSupplier(supplier) + .withCoordinatorShardBuilderSupplier(supplier) .build(); - MockCoordinator coordinator0 = mock(MockCoordinator.class); - MockCoordinator coordinator1 = mock(MockCoordinator.class); + MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class); + MockCoordinatorShard coordinator1 = mock(MockCoordinatorShard.class); when(supplier.get()).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -1059,21 +1059,21 @@ public class CoordinatorRuntimeTest { @Test public void testScheduleTimer() throws InterruptedException { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastCommittedOffset); @@ -1110,14 +1110,14 @@ public class CoordinatorRuntimeTest { public void testRescheduleTimer() throws InterruptedException { MockTimer timer = new MockTimer(); ManualEventProcessor processor = new ManualEventProcessor(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(processor) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. @@ -1128,7 +1128,7 @@ public class CoordinatorRuntimeTest { processor.poll(); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.timer.size()); // The processor should be empty. @@ -1181,14 +1181,14 @@ public class CoordinatorRuntimeTest { public void testCancelTimer() throws InterruptedException { MockTimer timer = new MockTimer(); ManualEventProcessor processor = new ManualEventProcessor(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(processor) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. @@ -1199,7 +1199,7 @@ public class CoordinatorRuntimeTest { processor.poll(); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.timer.size()); // The processor should be empty. @@ -1249,21 +1249,21 @@ public class CoordinatorRuntimeTest { @Test public void testRetryableTimer() throws InterruptedException { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.timer.size()); // Timer #1. @@ -1305,21 +1305,21 @@ public class CoordinatorRuntimeTest { @Test public void testNonRetryableTimer() throws InterruptedException { MockTimer timer = new MockTimer(); - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() .withTime(timer.time()) .withTimer(timer) .withLoader(new MockCoordinatorLoader()) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) - .withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .build(); // Loads the coordinator. runtime.scheduleLoadOperation(TP, 10); // Check initial state. - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.timer.size()); // Timer #1.