MINOR: Code cleanups in group-coordinator module (#14117)

This patch does a few code cleanups in the group-coordinator module.

It renames Coordinator to CoordinatorShard;
It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
It renames assignors to consumerGroupAssignors.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-07-28 20:28:54 +02:00 committed by GitHub
parent 3709901c9e
commit 1574b9f16d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 228 additions and 276 deletions

View File

@ -58,7 +58,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
@ -134,8 +134,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
String logPrefix = String.format("GroupCoordinator id=%d", nodeId); String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record> supplier = () -> CoordinatorShardBuilderSupplier<GroupCoordinatorShard, Record> supplier = () ->
new ReplicatedGroupCoordinator.Builder(config); new GroupCoordinatorShard.Builder(config);
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor( CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext, logContext,
@ -143,8 +143,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
config.numThreads config.numThreads
); );
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator, Record>() new CoordinatorRuntime.Builder<GroupCoordinatorShard, Record>()
.withTime(time) .withTime(time)
.withTimer(timer) .withTimer(timer)
.withLogPrefix(logPrefix) .withLogPrefix(logPrefix)
@ -152,7 +152,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
.withEventProcessor(processor) .withEventProcessor(processor)
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withLoader(loader) .withLoader(loader)
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.withTime(time) .withTime(time)
.build(); .build();
@ -177,7 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
/** /**
* The coordinator runtime. * The coordinator runtime.
*/ */
private final CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime; private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;
/** /**
* Boolean indicating whether the coordinator is active or not. * Boolean indicating whether the coordinator is active or not.
@ -199,7 +199,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
GroupCoordinatorService( GroupCoordinatorService(
LogContext logContext, LogContext logContext,
GroupCoordinatorConfig config, GroupCoordinatorConfig config,
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime CoordinatorRuntime<GroupCoordinatorShard, Record> runtime
) { ) {
this.log = logContext.logger(CoordinatorLoader.class); this.log = logContext.logger(CoordinatorLoader.class);
this.config = config; this.config = config;

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.coordinator.group; package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatRequestData;
@ -48,8 +47,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.runtime.Coordinator; import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
@ -60,22 +59,21 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
* The group coordinator replicated state machine that manages the metadata of all generic and * The group coordinator shard is a replicated state machine that manages the metadata of all
* consumer groups. It holds the hard and the soft state of the groups. This class has two kinds * generic and consumer groups. It holds the hard and the soft state of the groups. This class
* of methods: * has two kinds of methods:
* 1) The request handlers which handle the requests and generate a response and records to * 1) The request handlers which handle the requests and generate a response and records to
* mutate the hard state. Those records will be written by the runtime and applied to the * mutate the hard state. Those records will be written by the runtime and applied to the
* hard state via the replay methods. * hard state via the replay methods.
* 2) The replay methods which apply records to the hard state. Those are used in the request * 2) The replay methods which apply records to the hard state. Those are used in the request
* handling as well as during the initial loading of the records from the partitions. * handling as well as during the initial loading of the records from the partitions.
*/ */
public class ReplicatedGroupCoordinator implements Coordinator<Record> { public class GroupCoordinatorShard implements CoordinatorShard<Record> {
public static class Builder implements CoordinatorBuilder<ReplicatedGroupCoordinator, Record> { public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, Record> {
private final GroupCoordinatorConfig config; private final GroupCoordinatorConfig config;
private LogContext logContext; private LogContext logContext;
private SnapshotRegistry snapshotRegistry; private SnapshotRegistry snapshotRegistry;
private TopicPartition topicPartition;
private Time time; private Time time;
private CoordinatorTimer<Void, Record> timer; private CoordinatorTimer<Void, Record> timer;
@ -86,7 +84,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
} }
@Override @Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withLogContext( public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withLogContext(
LogContext logContext LogContext logContext
) { ) {
this.logContext = logContext; this.logContext = logContext;
@ -94,7 +92,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
} }
@Override @Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTime( public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withTime(
Time time Time time
) { ) {
this.time = time; this.time = time;
@ -102,7 +100,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
} }
@Override @Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTimer( public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withTimer(
CoordinatorTimer<Void, Record> timer CoordinatorTimer<Void, Record> timer
) { ) {
this.timer = timer; this.timer = timer;
@ -110,7 +108,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
} }
@Override @Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withSnapshotRegistry( public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry SnapshotRegistry snapshotRegistry
) { ) {
this.snapshotRegistry = snapshotRegistry; this.snapshotRegistry = snapshotRegistry;
@ -118,15 +116,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
} }
@Override @Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTopicPartition( public GroupCoordinatorShard build() {
TopicPartition topicPartition
) {
this.topicPartition = topicPartition;
return this;
}
@Override
public ReplicatedGroupCoordinator build() {
if (logContext == null) logContext = new LogContext(); if (logContext == null) logContext = new LogContext();
if (config == null) if (config == null)
throw new IllegalArgumentException("Config must be set."); throw new IllegalArgumentException("Config must be set.");
@ -136,16 +126,13 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
throw new IllegalArgumentException("Time must be set."); throw new IllegalArgumentException("Time must be set.");
if (timer == null) if (timer == null)
throw new IllegalArgumentException("Timer must be set."); throw new IllegalArgumentException("Timer must be set.");
if (topicPartition == null)
throw new IllegalArgumentException("TopicPartition must be set.");
GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder()
.withLogContext(logContext) .withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry) .withSnapshotRegistry(snapshotRegistry)
.withTime(time) .withTime(time)
.withTimer(timer) .withTimer(timer)
.withTopicPartition(topicPartition) .withConsumerGroupAssignors(config.consumerGroupAssignors)
.withAssignors(config.consumerGroupAssignors)
.withConsumerGroupMaxSize(config.consumerGroupMaxSize) .withConsumerGroupMaxSize(config.consumerGroupMaxSize)
.withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
.withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs) .withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs)
@ -162,7 +149,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
.withOffsetMetadataMaxSize(config.offsetMetadataMaxSize) .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize)
.build(); .build();
return new ReplicatedGroupCoordinator( return new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -185,7 +172,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
* @param groupMetadataManager The group metadata manager. * @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager. * @param offsetMetadataManager The offset metadata manager.
*/ */
ReplicatedGroupCoordinator( GroupCoordinatorShard(
GroupMetadataManager groupMetadataManager, GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager OffsetMetadataManager offsetMetadataManager
) { ) {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group; package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@ -131,11 +130,10 @@ public class GroupMetadataManager {
private SnapshotRegistry snapshotRegistry = null; private SnapshotRegistry snapshotRegistry = null;
private Time time = null; private Time time = null;
private CoordinatorTimer<Void, Record> timer = null; private CoordinatorTimer<Void, Record> timer = null;
private List<PartitionAssignor> assignors = null; private List<PartitionAssignor> consumerGroupAssignors = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000; private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
private TopicPartition topicPartition = null;
private MetadataImage metadataImage = null; private MetadataImage metadataImage = null;
private int consumerGroupSessionTimeoutMs = 45000; private int consumerGroupSessionTimeoutMs = 45000;
private int genericGroupMaxSize = Integer.MAX_VALUE; private int genericGroupMaxSize = Integer.MAX_VALUE;
@ -164,8 +162,8 @@ public class GroupMetadataManager {
return this; return this;
} }
Builder withAssignors(List<PartitionAssignor> assignors) { Builder withConsumerGroupAssignors(List<PartitionAssignor> consumerGroupAssignors) {
this.assignors = assignors; this.consumerGroupAssignors = consumerGroupAssignors;
return this; return this;
} }
@ -194,11 +192,6 @@ public class GroupMetadataManager {
return this; return this;
} }
Builder withTopicPartition(TopicPartition tp) {
this.topicPartition = tp;
return this;
}
Builder withGenericGroupMaxSize(int genericGroupMaxSize) { Builder withGenericGroupMaxSize(int genericGroupMaxSize) {
this.genericGroupMaxSize = genericGroupMaxSize; this.genericGroupMaxSize = genericGroupMaxSize;
return this; return this;
@ -232,20 +225,15 @@ public class GroupMetadataManager {
if (timer == null) if (timer == null)
throw new IllegalArgumentException("Timer must be set."); throw new IllegalArgumentException("Timer must be set.");
if (assignors == null || assignors.isEmpty()) if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty())
throw new IllegalArgumentException("Assignors must be set before building."); throw new IllegalArgumentException("Assignors must be set before building.");
if (topicPartition == null) {
throw new IllegalStateException("TopicPartition must be set before building.");
}
return new GroupMetadataManager( return new GroupMetadataManager(
topicPartition,
snapshotRegistry, snapshotRegistry,
logContext, logContext,
time, time,
timer, timer,
assignors, consumerGroupAssignors,
metadataImage, metadataImage,
consumerGroupMaxSize, consumerGroupMaxSize,
consumerGroupSessionTimeoutMs, consumerGroupSessionTimeoutMs,
@ -260,11 +248,6 @@ public class GroupMetadataManager {
} }
} }
/**
* The topic partition associated with the metadata manager.
*/
private final TopicPartition topicPartition;
/** /**
* The log context. * The log context.
*/ */
@ -370,7 +353,6 @@ public class GroupMetadataManager {
private final int genericGroupMaxSessionTimeoutMs; private final int genericGroupMaxSessionTimeoutMs;
private GroupMetadataManager( private GroupMetadataManager(
TopicPartition topicPartition,
SnapshotRegistry snapshotRegistry, SnapshotRegistry snapshotRegistry,
LogContext logContext, LogContext logContext,
Time time, Time time,
@ -394,7 +376,6 @@ public class GroupMetadataManager {
this.timer = timer; this.timer = timer;
this.metadataImage = metadataImage; this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.topicPartition = topicPartition;
this.defaultAssignor = assignors.get(0); this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0); this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
@ -1975,8 +1956,7 @@ public class GroupMetadataManager {
} else { } else {
group.initNextGeneration(); group.initNextGeneration();
if (group.isInState(EMPTY)) { if (group.isInState(EMPTY)) {
log.info("Group {} with generation {} is now empty ({}-{})", log.info("Group {} with generation {} is now empty.", groupId, group.generationId());
groupId, group.generationId(), topicPartition.topic(), topicPartition.partition());
CompletableFuture<Void> appendFuture = new CompletableFuture<>(); CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> { appendFuture.whenComplete((__, t) -> {
@ -1994,8 +1974,8 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, appendFuture); return new CoordinatorResult<>(records, appendFuture);
} else { } else {
log.info("Stabilized group {} generation {} ({}) with {} members", log.info("Stabilized group {} generation {} with {} members.",
groupId, group.generationId(), topicPartition, group.size()); groupId, group.generationId(), group.size());
// Complete the awaiting join group response future for all the members after rebalancing // Complete the awaiting join group response future for all the members after rebalancing
group.allMembers().forEach(member -> { group.allMembers().forEach(member -> {
@ -2274,9 +2254,8 @@ public class GroupMetadataManager {
group.transitionTo(PREPARING_REBALANCE); group.transitionTo(PREPARING_REBALANCE);
log.info("Preparing to rebalance group {} in state {} with old generation {} ({}-{}) (reason: {})", log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
group.groupId(), group.currentState(), group.generationId(), group.groupId(), group.currentState(), group.generationId(), reason);
topicPartition.topic(), topicPartition.partition(), reason);
return isInitialRebalance ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(group); return isInitialRebalance ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(group);
} }

View File

@ -72,7 +72,7 @@ import java.util.function.Consumer;
* @param <S> The type of the state machine. * @param <S> The type of the state machine.
* @param <U> The type of the record. * @param <U> The type of the record.
*/ */
public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoCloseable { public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements AutoCloseable {
/** /**
* Builder to create a CoordinatorRuntime. * Builder to create a CoordinatorRuntime.
@ -80,13 +80,13 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
* @param <S> The type of the state machine. * @param <S> The type of the state machine.
* @param <U> The type of the record. * @param <U> The type of the record.
*/ */
public static class Builder<S extends Coordinator<U>, U> { public static class Builder<S extends CoordinatorShard<U>, U> {
private String logPrefix; private String logPrefix;
private LogContext logContext; private LogContext logContext;
private CoordinatorEventProcessor eventProcessor; private CoordinatorEventProcessor eventProcessor;
private PartitionWriter<U> partitionWriter; private PartitionWriter<U> partitionWriter;
private CoordinatorLoader<U> loader; private CoordinatorLoader<U> loader;
private CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier; private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
private Time time = Time.SYSTEM; private Time time = Time.SYSTEM;
private Timer timer; private Timer timer;
@ -115,8 +115,8 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
return this; return this;
} }
public Builder<S, U> withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier) { public Builder<S, U> withCoordinatorShardBuilderSupplier(CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier) {
this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
return this; return this;
} }
@ -141,7 +141,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
throw new IllegalArgumentException("Partition write must be set."); throw new IllegalArgumentException("Partition write must be set.");
if (loader == null) if (loader == null)
throw new IllegalArgumentException("Loader must be set."); throw new IllegalArgumentException("Loader must be set.");
if (coordinatorBuilderSupplier == null) if (coordinatorShardBuilderSupplier == null)
throw new IllegalArgumentException("State machine supplier must be set."); throw new IllegalArgumentException("State machine supplier must be set.");
if (time == null) if (time == null)
throw new IllegalArgumentException("Time must be set."); throw new IllegalArgumentException("Time must be set.");
@ -154,7 +154,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
eventProcessor, eventProcessor,
partitionWriter, partitionWriter,
loader, loader,
coordinatorBuilderSupplier, coordinatorShardBuilderSupplier,
time, time,
timer timer
); );
@ -508,13 +508,12 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
snapshotRegistry = new SnapshotRegistry(logContext); snapshotRegistry = new SnapshotRegistry(logContext);
lastWrittenOffset = 0L; lastWrittenOffset = 0L;
lastCommittedOffset = 0L; lastCommittedOffset = 0L;
coordinator = coordinatorBuilderSupplier coordinator = coordinatorShardBuilderSupplier
.get() .get()
.withLogContext(logContext) .withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry) .withSnapshotRegistry(snapshotRegistry)
.withTime(time) .withTime(time)
.withTimer(timer) .withTimer(timer)
.withTopicPartition(tp)
.build(); .build();
break; break;
@ -994,7 +993,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
* The coordinator state machine builder used by the runtime * The coordinator state machine builder used by the runtime
* to instantiate a coordinator. * to instantiate a coordinator.
*/ */
private final CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier; private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
/** /**
* Atomic boolean indicating whether the runtime is running. * Atomic boolean indicating whether the runtime is running.
@ -1009,14 +1008,14 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
/** /**
* Constructor. * Constructor.
* *
* @param logPrefix The log prefix. * @param logPrefix The log prefix.
* @param logContext The log context. * @param logContext The log context.
* @param processor The event processor. * @param processor The event processor.
* @param partitionWriter The partition writer. * @param partitionWriter The partition writer.
* @param loader The coordinator loader. * @param loader The coordinator loader.
* @param coordinatorBuilderSupplier The coordinator builder. * @param coordinatorShardBuilderSupplier The coordinator builder.
* @param time The system time. * @param time The system time.
* @param timer The system timer. * @param timer The system timer.
*/ */
private CoordinatorRuntime( private CoordinatorRuntime(
String logPrefix, String logPrefix,
@ -1024,7 +1023,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
CoordinatorEventProcessor processor, CoordinatorEventProcessor processor,
PartitionWriter<U> partitionWriter, PartitionWriter<U> partitionWriter,
CoordinatorLoader<U> loader, CoordinatorLoader<U> loader,
CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier, CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier,
Time time, Time time,
Timer timer Timer timer
) { ) {
@ -1038,7 +1037,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
this.partitionWriter = partitionWriter; this.partitionWriter = partitionWriter;
this.highWatermarklistener = new HighWatermarkListener(); this.highWatermarklistener = new HighWatermarkListener();
this.loader = loader; this.loader = loader;
this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
} }
/** /**

View File

@ -20,10 +20,10 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
/** /**
* Coordinator is basically a replicated state machine managed by the * CoordinatorShard is basically a replicated state machine managed by the
* {@link CoordinatorRuntime}. * {@link CoordinatorRuntime}.
*/ */
public interface Coordinator<U> extends CoordinatorPlayback<U> { public interface CoordinatorShard<U> extends CoordinatorPlayback<U> {
/** /**
* The coordinator has been loaded. This is used to apply any * The coordinator has been loaded. This is used to apply any
@ -34,7 +34,7 @@ public interface Coordinator<U> extends CoordinatorPlayback<U> {
default void onLoaded(MetadataImage newImage) {} default void onLoaded(MetadataImage newImage) {}
/** /**
* A new metadata image is available. This is only called after {@link Coordinator#onLoaded(MetadataImage)} * A new metadata image is available. This is only called after {@link CoordinatorShard#onLoaded(MetadataImage)}
* is called to signal that the coordinator has been fully loaded. * is called to signal that the coordinator has been fully loaded.
* *
* @param newImage The new metadata image. * @param newImage The new metadata image.

View File

@ -16,19 +16,18 @@
*/ */
package org.apache.kafka.coordinator.group.runtime; package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
/** /**
* A builder to build a {@link Coordinator} replicated state machine. * A builder to build a {@link CoordinatorShard} replicated state machine.
* *
* @param <S> The type of the coordinator. * @param <S> The type of the coordinator.
* @param <U> The record type. * @param <U> The record type.
*/ */
public interface CoordinatorBuilder<S extends Coordinator<U>, U> { public interface CoordinatorShardBuilder<S extends CoordinatorShard<U>, U> {
/** /**
* Sets the snapshot registry used to back all the timeline * Sets the snapshot registry used to back all the timeline
@ -38,7 +37,7 @@ public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
* *
* @return The builder. * @return The builder.
*/ */
CoordinatorBuilder<S, U> withSnapshotRegistry( CoordinatorShardBuilder<S, U> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry SnapshotRegistry snapshotRegistry
); );
@ -49,20 +48,10 @@ public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
* *
* @return The builder. * @return The builder.
*/ */
CoordinatorBuilder<S, U> withLogContext( CoordinatorShardBuilder<S, U> withLogContext(
LogContext logContext LogContext logContext
); );
/**
* Sets the topic partition.
* @param topicPartition The topic partition.
*
* @return The builder.
*/
CoordinatorBuilder<S, U> withTopicPartition(
TopicPartition topicPartition
);
/** /**
* Sets the time. * Sets the time.
* *
@ -70,7 +59,7 @@ public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
* *
* @return The builder. * @return The builder.
*/ */
CoordinatorBuilder<S, U> withTime( CoordinatorShardBuilder<S, U> withTime(
Time time Time time
); );
@ -81,7 +70,7 @@ public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
* *
* @return The builder. * @return The builder.
*/ */
CoordinatorBuilder<S, U> withTimer( CoordinatorShardBuilder<S, U> withTimer(
CoordinatorTimer<Void, U> timer CoordinatorTimer<Void, U> timer
); );

View File

@ -17,14 +17,14 @@
package org.apache.kafka.coordinator.group.runtime; package org.apache.kafka.coordinator.group.runtime;
/** /**
* Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}. * Supplies a {@link CoordinatorShardBuilder} to the {@link CoordinatorRuntime}.
* *
* @param <S> The type of the coordinator. * @param <S> The type of the coordinator.
* @param <U> The record type. * @param <U> The record type.
*/ */
public interface CoordinatorBuilderSupplier<S extends Coordinator<U>, U> { public interface CoordinatorShardBuilderSupplier<S extends CoordinatorShard<U>, U> {
/** /**
* @return A {@link CoordinatorBuilder}. * @return A {@link CoordinatorShardBuilder}.
*/ */
CoordinatorBuilder<S, U> get(); CoordinatorShardBuilder<S, U> get();
} }

View File

@ -83,8 +83,8 @@ import static org.mockito.Mockito.when;
public class GroupCoordinatorServiceTest { public class GroupCoordinatorServiceTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private CoordinatorRuntime<ReplicatedGroupCoordinator, Record> mockRuntime() { private CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime() {
return (CoordinatorRuntime<ReplicatedGroupCoordinator, Record>) mock(CoordinatorRuntime.class); return (CoordinatorRuntime<GroupCoordinatorShard, Record>) mock(CoordinatorRuntime.class);
} }
private GroupCoordinatorConfig createConfig() { private GroupCoordinatorConfig createConfig() {
@ -106,7 +106,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testStartupShutdown() throws Exception { public void testStartupShutdown() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -121,7 +121,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testConsumerGroupHeartbeatWhenNotStarted() { public void testConsumerGroupHeartbeatWhenNotStarted() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -142,7 +142,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException { public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -190,7 +190,7 @@ public class GroupCoordinatorServiceTest {
short expectedErrorCode, short expectedErrorCode,
String expectedErrorMessage String expectedErrorMessage
) throws ExecutionException, InterruptedException, TimeoutException { ) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -223,7 +223,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testPartitionFor() { public void testPartitionFor() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -240,7 +240,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testGroupMetadataTopicConfigs() { public void testGroupMetadataTopicConfigs() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -257,7 +257,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testOnElection() { public void testOnElection() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -278,7 +278,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testOnResignation() { public void testOnResignation() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -299,7 +299,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testJoinGroup() { public void testJoinGroup() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -330,7 +330,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testJoinGroupWithException() throws Exception { public void testJoinGroupWithException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -363,7 +363,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testJoinGroupInvalidGroupId() throws Exception { public void testJoinGroupInvalidGroupId() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -408,7 +408,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testSyncGroup() { public void testSyncGroup() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -439,7 +439,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testSyncGroupWithException() throws Exception { public void testSyncGroupWithException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -473,7 +473,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testSyncGroupInvalidGroupId() throws Exception { public void testSyncGroupInvalidGroupId() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -502,7 +502,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testHeartbeat() throws Exception { public void testHeartbeat() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -533,7 +533,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testHeartbeatCoordinatorNotAvailableException() throws Exception { public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),
@ -564,7 +564,7 @@ public class GroupCoordinatorServiceTest {
@Test @Test
public void testHeartbeatCoordinatorException() throws Exception { public void testHeartbeatCoordinatorException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService( GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(), new LogContext(),
createConfig(), createConfig(),

View File

@ -55,13 +55,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ReplicatedGroupCoordinatorTest { public class GroupCoordinatorShardTest {
@Test @Test
public void testConsumerGroupHeartbeat() { public void testConsumerGroupHeartbeat() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -85,7 +85,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testCommitOffset() { public void testCommitOffset() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -109,7 +109,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayOffsetCommit() { public void testReplayOffsetCommit() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -134,7 +134,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayOffsetCommitWithNullValue() { public void testReplayOffsetCommitWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -158,7 +158,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupMetadata() { public void testReplayConsumerGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -178,7 +178,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupMetadataWithNullValue() { public void testReplayConsumerGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -197,7 +197,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupPartitionMetadata() { public void testReplayConsumerGroupPartitionMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -217,7 +217,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupPartitionMetadataWithNullValue() { public void testReplayConsumerGroupPartitionMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -236,7 +236,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupMemberMetadata() { public void testReplayConsumerGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -256,7 +256,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupMemberMetadataWithNullValue() { public void testReplayConsumerGroupMemberMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -275,7 +275,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupTargetAssignmentMetadata() { public void testReplayConsumerGroupTargetAssignmentMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -295,7 +295,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -314,7 +314,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupTargetAssignmentMember() { public void testReplayConsumerGroupTargetAssignmentMember() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -334,7 +334,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -353,7 +353,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupCurrentMemberAssignment() { public void testReplayConsumerGroupCurrentMemberAssignment() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -373,7 +373,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -392,7 +392,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayKeyCannotBeNull() { public void testReplayKeyCannotBeNull() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -404,7 +404,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayWithUnsupportedVersion() { public void testReplayWithUnsupportedVersion() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -423,7 +423,7 @@ public class ReplicatedGroupCoordinatorTest {
MetadataImage image = MetadataImage.EMPTY; MetadataImage image = MetadataImage.EMPTY;
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -442,7 +442,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayGroupMetadata() { public void testReplayGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );
@ -462,7 +462,7 @@ public class ReplicatedGroupCoordinatorTest {
public void testReplayGroupMetadataWithNullValue() { public void testReplayGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
groupMetadataManager, groupMetadataManager,
offsetMetadataManager offsetMetadataManager
); );

View File

@ -302,7 +302,7 @@ public class GroupMetadataManagerTest {
final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
final private TopicPartition groupMetadataTopicPartition = new TopicPartition("topic", 0); final private TopicPartition groupMetadataTopicPartition = new TopicPartition("topic", 0);
private MetadataImage metadataImage; private MetadataImage metadataImage;
private List<PartitionAssignor> assignors = Collections.singletonList(new MockPartitionAssignor("range")); private List<PartitionAssignor> consumerGroupAssignors = Collections.singletonList(new MockPartitionAssignor("range"));
private List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>(); private List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>();
private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
@ -318,7 +318,7 @@ public class GroupMetadataManagerTest {
} }
public Builder withAssignors(List<PartitionAssignor> assignors) { public Builder withAssignors(List<PartitionAssignor> assignors) {
this.assignors = assignors; this.consumerGroupAssignors = assignors;
return this; return this;
} }
@ -359,14 +359,13 @@ public class GroupMetadataManagerTest {
public GroupMetadataManagerTestContext build() { public GroupMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (assignors == null) assignors = Collections.emptyList(); if (consumerGroupAssignors == null) consumerGroupAssignors = Collections.emptyList();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext( GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
time, time,
timer, timer,
snapshotRegistry, snapshotRegistry,
new GroupMetadataManager.Builder() new GroupMetadataManager.Builder()
.withTopicPartition(groupMetadataTopicPartition)
.withSnapshotRegistry(snapshotRegistry) .withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext) .withLogContext(logContext)
.withTime(time) .withTime(time)
@ -375,7 +374,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroupHeartbeatInterval(5000) .withConsumerGroupHeartbeatInterval(5000)
.withConsumerGroupSessionTimeout(45000) .withConsumerGroupSessionTimeout(45000)
.withConsumerGroupMaxSize(consumerGroupMaxSize) .withConsumerGroupMaxSize(consumerGroupMaxSize)
.withAssignors(assignors) .withConsumerGroupAssignors(consumerGroupAssignors)
.withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs) .withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs)
.withGenericGroupMaxSize(genericGroupMaxSize) .withGenericGroupMaxSize(genericGroupMaxSize)
.withGenericGroupMinSessionTimeoutMs(genericGroupMinSessionTimeoutMs) .withGenericGroupMinSessionTimeoutMs(genericGroupMinSessionTimeoutMs)

View File

@ -92,8 +92,7 @@ public class OffsetMetadataManagerTest {
.withSnapshotRegistry(snapshotRegistry) .withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext) .withLogContext(logContext)
.withMetadataImage(metadataImage) .withMetadataImage(metadataImage)
.withTopicPartition(new TopicPartition("__consumer_offsets", 0)) .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor()))
.withAssignors(Collections.singletonList(new RangeAssignor()))
.build(); .build();
OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder()

View File

@ -166,11 +166,11 @@ public class CoordinatorRuntimeTest {
/** /**
* A simple Coordinator implementation that stores the records into a set. * A simple Coordinator implementation that stores the records into a set.
*/ */
private static class MockCoordinator implements Coordinator<String> { private static class MockCoordinatorShard implements CoordinatorShard<String> {
private final TimelineHashSet<String> records; private final TimelineHashSet<String> records;
private final CoordinatorTimer<Void, String> timer; private final CoordinatorTimer<Void, String> timer;
MockCoordinator( MockCoordinatorShard(
SnapshotRegistry snapshotRegistry, SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer CoordinatorTimer<Void, String> timer
) { ) {
@ -195,12 +195,12 @@ public class CoordinatorRuntimeTest {
/** /**
* A CoordinatorBuilder that creates a MockCoordinator. * A CoordinatorBuilder that creates a MockCoordinator.
*/ */
private static class MockCoordinatorBuilder implements CoordinatorBuilder<MockCoordinator, String> { private static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
private SnapshotRegistry snapshotRegistry; private SnapshotRegistry snapshotRegistry;
private CoordinatorTimer<Void, String> timer; private CoordinatorTimer<Void, String> timer;
@Override @Override
public CoordinatorBuilder<MockCoordinator, String> withSnapshotRegistry( public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry SnapshotRegistry snapshotRegistry
) { ) {
this.snapshotRegistry = snapshotRegistry; this.snapshotRegistry = snapshotRegistry;
@ -208,36 +208,36 @@ public class CoordinatorRuntimeTest {
} }
@Override @Override
public CoordinatorBuilder<MockCoordinator, String> withLogContext( public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(
LogContext logContext LogContext logContext
) { ) {
return this; return this;
} }
@Override @Override
public CoordinatorBuilder<MockCoordinator, String> withTime( public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
Time time Time time
) { ) {
return this; return this;
} }
@Override @Override
public CoordinatorBuilder<MockCoordinator, String> withTimer( public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
CoordinatorTimer<Void, String> timer CoordinatorTimer<Void, String> timer
) { ) {
this.timer = timer; this.timer = timer;
return this; return this;
} }
public CoordinatorBuilder<MockCoordinator, String> withTopicPartition( public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(
TopicPartition topicPartition TopicPartition topicPartition
) { ) {
return this; return this;
} }
@Override @Override
public MockCoordinator build() { public MockCoordinatorShard build() {
return new MockCoordinator( return new MockCoordinatorShard(
Objects.requireNonNull(this.snapshotRegistry), Objects.requireNonNull(this.snapshotRegistry),
Objects.requireNonNull(this.timer) Objects.requireNonNull(this.timer)
); );
@ -247,10 +247,10 @@ public class CoordinatorRuntimeTest {
/** /**
* A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder. * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
*/ */
private static class MockCoordinatorBuilderSupplier implements CoordinatorBuilderSupplier<MockCoordinator, String> { private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
@Override @Override
public CoordinatorBuilder<MockCoordinator, String> get() { public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
return new MockCoordinatorBuilder(); return new MockCoordinatorShardBuilder();
} }
} }
@ -259,18 +259,18 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockPartitionWriter writer = mock(MockPartitionWriter.class); MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -291,7 +291,7 @@ public class CoordinatorRuntimeTest {
runtime.scheduleLoadOperation(TP, 0); runtime.scheduleLoadOperation(TP, 0);
// Getting the coordinator context succeeds now. // Getting the coordinator context succeeds now.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// The coordinator is loading. // The coordinator is loading.
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
@ -323,18 +323,18 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class); MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -351,7 +351,7 @@ public class CoordinatorRuntimeTest {
runtime.scheduleLoadOperation(TP, 0); runtime.scheduleLoadOperation(TP, 0);
// Getting the context succeeds and the coordinator should be in loading. // Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(0, ctx.epoch); assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator); assertEquals(coordinator, ctx.coordinator);
@ -374,18 +374,18 @@ public class CoordinatorRuntimeTest {
public void testScheduleLoadingWithStalePartitionEpoch() { public void testScheduleLoadingWithStalePartitionEpoch() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -402,7 +402,7 @@ public class CoordinatorRuntimeTest {
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Getting the context succeeds and the coordinator should be in loading. // Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(10, ctx.epoch); assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator); assertEquals(coordinator, ctx.coordinator);
@ -423,18 +423,18 @@ public class CoordinatorRuntimeTest {
public void testScheduleLoadingAfterLoadingFailure() { public void testScheduleLoadingAfterLoadingFailure() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -451,7 +451,7 @@ public class CoordinatorRuntimeTest {
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Getting the context succeeds and the coordinator should be in loading. // Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(10, ctx.epoch); assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator); assertEquals(coordinator, ctx.coordinator);
@ -464,7 +464,7 @@ public class CoordinatorRuntimeTest {
verify(coordinator, times(1)).onUnloaded(); verify(coordinator, times(1)).onUnloaded();
// Create a new coordinator. // Create a new coordinator.
coordinator = mock(MockCoordinator.class); coordinator = mock(MockCoordinatorShard.class);
when(builder.build()).thenReturn(coordinator); when(builder.build()).thenReturn(coordinator);
// Schedule the reloading. // Schedule the reloading.
@ -489,18 +489,18 @@ public class CoordinatorRuntimeTest {
public void testScheduleUnloading() { public void testScheduleUnloading() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class); MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -513,7 +513,7 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active. // Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(10, ctx.epoch); assertEquals(10, ctx.epoch);
@ -537,18 +537,18 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleUnloadingWithStalePartitionEpoch() { public void testScheduleUnloadingWithStalePartitionEpoch() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinator coordinator = mock(MockCoordinator.class); MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -563,7 +563,7 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active. // Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(10, ctx.epoch); assertEquals(10, ctx.epoch);
@ -579,21 +579,21 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter(); MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Schedule the loading. // Schedule the loading.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state. // Verify the initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.lastWrittenOffset); assertEquals(0L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset); assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
@ -687,14 +687,14 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleWriteOpWhenInactive() { public void testScheduleWriteOpWhenInactive() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Scheduling a write fails with a NotCoordinatorException because the coordinator // Scheduling a write fails with a NotCoordinatorException because the coordinator
@ -707,14 +707,14 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleWriteOpWhenOpFails() { public void testScheduleWriteOpWhenOpFails() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
@ -731,28 +731,28 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleWriteOpWhenReplayFails() { public void testScheduleWriteOpWhenReplayFails() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state. // Verify the initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.lastWrittenOffset); assertEquals(0L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset); assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
// Override the coordinator with a coordinator that throws // Override the coordinator with a coordinator that throws
// an exception when replay is called. // an exception when replay is called.
ctx.coordinator = new MockCoordinator(ctx.snapshotRegistry, ctx.timer) { ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) {
@Override @Override
public void replay(String record) throws RuntimeException { public void replay(String record) throws RuntimeException {
throw new IllegalArgumentException("error"); throw new IllegalArgumentException("error");
@ -776,21 +776,21 @@ public class CoordinatorRuntimeTest {
// The partition writer only accept on write. // The partition writer only accept on write.
MockPartitionWriter writer = new MockPartitionWriter(2); MockPartitionWriter writer = new MockPartitionWriter(2);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state. // Verify the initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset); assertEquals(0, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
@ -823,21 +823,21 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter(); MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state. // Verify the initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset); assertEquals(0, ctx.lastCommittedOffset);
@ -877,14 +877,14 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleReadOpWhenPartitionInactive() { public void testScheduleReadOpWhenPartitionInactive() {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Schedule a read. It fails because the coordinator does not exist. // Schedule a read. It fails because the coordinator does not exist.
@ -898,21 +898,21 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter(); MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state. // Verify the initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset); assertEquals(0, ctx.lastCommittedOffset);
@ -939,21 +939,21 @@ public class CoordinatorRuntimeTest {
public void testClose() throws Exception { public void testClose() throws Exception {
MockCoordinatorLoader loader = spy(new MockCoordinatorLoader()); MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset); assertEquals(0, ctx.lastCommittedOffset);
@ -1001,21 +1001,21 @@ public class CoordinatorRuntimeTest {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockPartitionWriter writer = mock(MockPartitionWriter.class); MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(loader) .withLoader(loader)
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer) .withPartitionWriter(writer)
.withCoordinatorBuilderSupplier(supplier) .withCoordinatorShardBuilderSupplier(supplier)
.build(); .build();
MockCoordinator coordinator0 = mock(MockCoordinator.class); MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
MockCoordinator coordinator1 = mock(MockCoordinator.class); MockCoordinatorShard coordinator1 = mock(MockCoordinatorShard.class);
when(supplier.get()).thenReturn(builder); when(supplier.get()).thenReturn(builder);
when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -1059,21 +1059,21 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testScheduleTimer() throws InterruptedException { public void testScheduleTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset); assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset); assertEquals(0, ctx.lastCommittedOffset);
@ -1110,14 +1110,14 @@ public class CoordinatorRuntimeTest {
public void testRescheduleTimer() throws InterruptedException { public void testRescheduleTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
ManualEventProcessor processor = new ManualEventProcessor(); ManualEventProcessor processor = new ManualEventProcessor();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor) .withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
@ -1128,7 +1128,7 @@ public class CoordinatorRuntimeTest {
processor.poll(); processor.poll();
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size()); assertEquals(0, ctx.timer.size());
// The processor should be empty. // The processor should be empty.
@ -1181,14 +1181,14 @@ public class CoordinatorRuntimeTest {
public void testCancelTimer() throws InterruptedException { public void testCancelTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
ManualEventProcessor processor = new ManualEventProcessor(); ManualEventProcessor processor = new ManualEventProcessor();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(processor) .withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
@ -1199,7 +1199,7 @@ public class CoordinatorRuntimeTest {
processor.poll(); processor.poll();
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size()); assertEquals(0, ctx.timer.size());
// The processor should be empty. // The processor should be empty.
@ -1249,21 +1249,21 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testRetryableTimer() throws InterruptedException { public void testRetryableTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size()); assertEquals(0, ctx.timer.size());
// Timer #1. // Timer #1.
@ -1305,21 +1305,21 @@ public class CoordinatorRuntimeTest {
@Test @Test
public void testNonRetryableTimer() throws InterruptedException { public void testNonRetryableTimer() throws InterruptedException {
MockTimer timer = new MockTimer(); MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinator, String> runtime = CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>() new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time()) .withTime(timer.time())
.withTimer(timer) .withTimer(timer)
.withLoader(new MockCoordinatorLoader()) .withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor()) .withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter()) .withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.build(); .build();
// Loads the coordinator. // Loads the coordinator.
runtime.scheduleLoadOperation(TP, 10); runtime.scheduleLoadOperation(TP, 10);
// Check initial state. // Check initial state.
CoordinatorRuntime<MockCoordinator, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size()); assertEquals(0, ctx.timer.size());
// Timer #1. // Timer #1.