KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)

This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-06-12 08:29:50 +02:00
parent 8de153ebd6
commit 6016b15bea
10 changed files with 1263 additions and 182 deletions

View File

@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable {
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
/**
* Check if we have room for a given number of bytes.
*/
public boolean hasRoomFor(int estimatedRecordsSize) {
if (isFull()) return false;
return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize;
}
public int maxAllowedBytes() {
return this.writeLimit - this.batchHeaderSizeInBytes;
}
public boolean isClosed() {
return builtRecords != null;
}

View File

@ -572,6 +572,7 @@ class BrokerServer(
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,

View File

@ -278,6 +278,7 @@ object KafkaConfig {
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
@ -948,6 +949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)
/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)

View File

@ -57,6 +57,10 @@ public class GroupCoordinatorConfig {
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10;
public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads";
public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator.";
@ -164,6 +168,12 @@ public class GroupCoordinatorConfig {
*/
public final int numThreads;
/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
public final int appendLingerMs;
/**
* The consumer group session timeout in milliseconds.
*/
@ -259,6 +269,7 @@ public class GroupCoordinatorConfig {
public GroupCoordinatorConfig(
int numThreads,
int appendLingerMs,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
@ -277,6 +288,7 @@ public class GroupCoordinatorConfig {
CompressionType compressionType
) {
this.numThreads = numThreads;
this.appendLingerMs = appendLingerMs;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;

View File

@ -182,12 +182,12 @@ public class GroupCoordinatorService implements GroupCoordinator {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
.withCompression(Compression.of(config.compressionType).build())
.withAppendLingerMs(config.appendLingerMs)
.build();
return new GroupCoordinatorService(

View File

@ -24,11 +24,13 @@ import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
@ -50,10 +52,12 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
@ -66,8 +70,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.kafka.common.record.Record.EMPTY_HEADERS;
/**
* The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator
* or the transaction coordinator.
@ -115,6 +117,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
private CoordinatorMetrics coordinatorMetrics;
private Serializer<U> serializer;
private Compression compression;
private int appendLingerMs;
public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@ -181,6 +184,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
return this;
}
public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
this.appendLingerMs = appendLingerMs;
return this;
}
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
@ -206,6 +214,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
throw new IllegalArgumentException("Serializer must be set.");
if (compression == null)
compression = Compression.NONE;
if (appendLingerMs < 0)
throw new IllegalArgumentException("AppendLinger must be >= 0");
return new CoordinatorRuntime<>(
logPrefix,
@ -220,7 +230,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
runtimeMetrics,
coordinatorMetrics,
serializer,
compression
compression,
appendLingerMs
);
}
}
@ -275,7 +286,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
FAILED {
@Override
boolean canTransitionFrom(CoordinatorState state) {
return state == LOADING;
return state == LOADING || state == ACTIVE;
}
};
@ -434,6 +445,81 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
/**
* A simple container class to hold all the attributes
* related to a pending batch.
*/
private static class CoordinatorBatch {
/**
* The base (or first) offset of the batch. If the batch fails
* for any reason, the state machines is rolled back to it.
*/
final long baseOffset;
/**
* The time at which the batch was created.
*/
final long appendTimeMs;
/**
* The max batch size.
*/
final int maxBatchSize;
/**
* The verification guard associated to the batch if it is
* transactional.
*/
final VerificationGuard verificationGuard;
/**
* The byte buffer backing the records builder.
*/
final ByteBuffer buffer;
/**
* The records builder.
*/
final MemoryRecordsBuilder builder;
/**
* The timer used to enfore the append linger time if
* it is non-zero.
*/
final Optional<TimerTask> lingerTimeoutTask;
/**
* The list of deferred events associated with the batch.
*/
final List<DeferredEvent> deferredEvents;
/**
* The next offset. This is updated when records
* are added to the batch.
*/
long nextOffset;
CoordinatorBatch(
long baseOffset,
long appendTimeMs,
int maxBatchSize,
VerificationGuard verificationGuard,
ByteBuffer buffer,
MemoryRecordsBuilder builder,
Optional<TimerTask> lingerTimeoutTask
) {
this.baseOffset = baseOffset;
this.nextOffset = baseOffset;
this.appendTimeMs = appendTimeMs;
this.maxBatchSize = maxBatchSize;
this.verificationGuard = verificationGuard;
this.buffer = buffer;
this.builder = builder;
this.lingerTimeoutTask = lingerTimeoutTask;
this.deferredEvents = new ArrayList<>();
}
}
/**
* CoordinatorContext holds all the metadata around a coordinator state machine.
*/
@ -493,6 +579,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
BufferSupplier bufferSupplier;
/**
* The current (or pending) batch.
*/
CoordinatorBatch currentBatch;
/**
* Constructor.
*
@ -547,6 +638,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
.build(),
tp
);
load();
break;
case ACTIVE:
@ -573,6 +665,46 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
runtimeMetrics.recordPartitionStateChange(oldState, state);
}
/**
* Loads the coordinator.
*/
private void load() {
if (state != CoordinatorState.LOADING) {
throw new IllegalStateException("Coordinator must be in loading state");
}
loader.load(tp, coordinator).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + epoch + ")", tp, () -> {
CoordinatorContext context = coordinators.get(tp);
if (context != null) {
if (context.state != CoordinatorState.LOADING) {
log.info("Ignored load completion from {} because context is in {} state.",
context.tp, context.state);
return;
}
try {
if (exception != null) throw exception;
context.transitionTo(CoordinatorState.ACTIVE);
if (summary != null) {
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
log.info("Finished loading of metadata from {} with epoch {} in {}ms where {}ms " +
"was spent in the scheduler. Loaded {} records which total to {} bytes.",
tp, epoch, summary.endTimeMs() - summary.startTimeMs(),
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
}
} catch (Throwable ex) {
log.error("Failed to load metadata from {} with epoch {} due to {}.",
tp, epoch, ex.toString());
context.transitionTo(CoordinatorState.FAILED);
}
} else {
log.debug("Failed to complete the loading of metadata for {} in epoch {} since the coordinator does not exist.",
tp, epoch);
}
});
});
}
/**
* Unloads the coordinator.
*/
@ -583,11 +715,352 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
/**
* Frees the current batch.
*/
private void freeCurrentBatch() {
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
// Release the buffer.
bufferSupplier.release(currentBatch.buffer);
currentBatch = null;
}
/**
* Flushes the current (or pending) batch to the log. When the batch is written
* locally, a new snapshot is created in the snapshot registry and the events
* associated with the batch are added to the deferred event queue.
*/
private void flushCurrentBatch() {
if (currentBatch != null) {
try {
// Write the records to the log and update the last written offset.
long offset = partitionWriter.append(
tp,
currentBatch.verificationGuard,
currentBatch.builder.build()
);
coordinator.updateLastWrittenOffset(offset);
if (offset != currentBatch.nextOffset) {
log.error("The state machine of the coordinator {} is out of sync with the underlying log. " +
"The last written offset returned is {} while the coordinator expected {}. The coordinator " +
"will be reloaded in order to re-synchronize the state machine.",
tp, offset, currentBatch.nextOffset);
// Transition to FAILED state to unload the state machine and complete
// exceptionally all the pending operations.
transitionTo(CoordinatorState.FAILED);
// Transition to LOADING to trigger the restoration of the state.
transitionTo(CoordinatorState.LOADING);
// Thrown NotCoordinatorException to fail the operation that
// triggered the write. We use NotCoordinatorException to be
// consistent with the transition to FAILED.
throw Errors.NOT_COORDINATOR.exception();
}
// Add all the pending deferred events to the deferred event queue.
for (DeferredEvent event : currentBatch.deferredEvents) {
deferredEventQueue.add(offset, event);
}
// Free up the current batch.
freeCurrentBatch();
} catch (Throwable t) {
log.error("Writing records to {} failed due to: {}.", tp, t.getMessage());
failCurrentBatch(t);
// We rethrow the exception for the caller to handle it too.
throw t;
}
}
}
/**
* Flushes the current batch if it is transactional or if it has passed the append linger time.
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
flushCurrentBatch();
}
}
}
/**
* Fails the current batch, reverts to the snapshot to the base/start offset of the
* batch, fails all the associated events.
*/
private void failCurrentBatch(Throwable t) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
for (DeferredEvent event : currentBatch.deferredEvents) {
event.complete(t);
}
freeCurrentBatch();
}
}
/**
* Allocates a new batch if none already exists.
*/
private void maybeAllocateNewBatch(
long producerId,
short producerEpoch,
VerificationGuard verificationGuard,
long currentTimeMs
) {
if (currentBatch == null) {
LogConfig logConfig = partitionWriter.config(tp);
byte magic = logConfig.recordVersion().value;
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
compression,
TimestampType.CREATE_TIME,
0L,
currentTimeMs,
producerId,
producerEpoch,
0,
producerId != RecordBatch.NO_PRODUCER_ID,
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
maxBatchSize
);
Optional<TimerTask> lingerTimeoutTask = Optional.empty();
if (appendLingerMs > 0) {
lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) {
@Override
public void run() {
// An event to flush the batch is pushed to the front of the queue
// to ensure that the linger time is respected.
enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
if (this.isCancelled()) return;
withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch);
}));
}
});
CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
}
currentBatch = new CoordinatorBatch(
prevLastWrittenOffset,
currentTimeMs,
maxBatchSize,
verificationGuard,
buffer,
builder,
lingerTimeoutTask
);
}
}
/**
* Appends records to the log and replay them to the state machine.
*
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param verificationGuard The verification guard.
* @param records The records to append.
* @param replay A boolean indicating whether the records
* must be replayed or not.
* @param event The event that must be completed when the
* records are written.
*/
private void append(
long producerId,
short producerEpoch,
VerificationGuard verificationGuard,
List<U> records,
boolean replay,
DeferredEvent event
) {
if (state != CoordinatorState.ACTIVE) {
throw new IllegalStateException("Coordinator must be active to append records");
}
if (records.isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null) {
currentBatch.deferredEvents.add(event);
} else {
OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset();
if (pendingOffset.isPresent()) {
deferredEventQueue.add(pendingOffset.getAsLong(), event);
} else {
event.complete(null);
}
}
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, they are appended to the opened batch.
long currentTimeMs = time.milliseconds();
// If the current write operation is transactional, the current batch
// is written before proceeding with it.
if (producerId != RecordBatch.NO_PRODUCER_ID) {
// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
flushCurrentBatch();
}
// Allocate a new batch if none exists.
maybeAllocateNewBatch(
producerId,
producerEpoch,
verificationGuard,
currentTimeMs
);
// Prepare the records.
List<SimpleRecord> recordsToAppend = new ArrayList<>(records.size());
for (U record : records) {
recordsToAppend.add(new SimpleRecord(
currentTimeMs,
serializer.serializeKey(record),
serializer.serializeValue(record)
));
}
// Compute the estimated size of the records.
int estimatedSize = AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
compression.type(),
recordsToAppend
);
// Check if the current batch has enough space. We check is before
// replaying the records in order to avoid having to revert back
// changes if the records do not fit within a batch.
if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
throw new RecordTooLargeException("Message batch size is " + estimatedSize +
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + currentBatch.maxBatchSize + ".");
}
if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
// Otherwise, we write the current batch, allocate a new one and re-verify
// whether the records fit in it.
// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
flushCurrentBatch();
maybeAllocateNewBatch(
producerId,
producerEpoch,
verificationGuard,
currentTimeMs
);
}
// Add the event to the list of pending events associated with the batch.
currentBatch.deferredEvents.add(event);
try {
// Apply record to the state machine.
if (replay) {
for (int i = 0; i < records.size(); i++) {
// We compute the offset of the record based on the last written offset. The
// coordinator is the single writer to the underlying partition so we can
// deduce it like this.
coordinator.replay(
currentBatch.nextOffset + i,
producerId,
producerEpoch,
records.get(i)
);
}
}
// Append to the batch.
for (SimpleRecord record : recordsToAppend) {
currentBatch.builder.append(record);
currentBatch.nextOffset++;
}
} catch (Throwable t) {
log.error("Replaying records to {} failed due to: {}.", tp, t.getMessage());
// If an exception is thrown, we fail the entire batch. Exceptions should be
// really exceptional in this code path and they would usually be the results
// of bugs preventing records to be replayed.
failCurrentBatch(t);
}
// Write the current batch if it is transactional or if the linger timeout
// has expired.
// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
maybeFlushCurrentBatch(currentTimeMs);
}
}
/**
* Completes a transaction.
*
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param coordinatorEpoch The coordinator epoch of the transaction coordinator.
* @param result The transaction result.
* @param event The event that must be completed when the
* control record is written.
*/
private void completeTransaction(
long producerId,
short producerEpoch,
int coordinatorEpoch,
TransactionResult result,
DeferredEvent event
) {
if (state != CoordinatorState.ACTIVE) {
throw new IllegalStateException("Coordinator must be active to complete a transaction");
}
// The current batch must be written before the transaction marker is written
// in order to respect the order.
flushCurrentBatch();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
try {
coordinator.replayEndTransactionMarker(
producerId,
producerEpoch,
result
);
long offset = partitionWriter.append(
tp,
VerificationGuard.SENTINEL,
MemoryRecords.withEndTransactionMarker(
time.milliseconds(),
producerId,
producerEpoch,
new EndTransactionMarker(
result == TransactionResult.COMMIT ? ControlRecordType.COMMIT : ControlRecordType.ABORT,
coordinatorEpoch
)
)
);
coordinator.updateLastWrittenOffset(offset);
deferredEventQueue.add(offset, event);
} catch (Throwable t) {
coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
event.complete(t);
}
}
}
class OperationTimeout extends TimerTask {
@ -781,100 +1254,20 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
// Execute the operation.
result = op.generateRecordsAndResult(context.coordinator.coordinator());
if (result.records().isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
OptionalLong pendingOffset = context.deferredEventQueue.highestPendingOffset();
if (pendingOffset.isPresent()) {
context.deferredEventQueue.add(pendingOffset.getAsLong(), this);
} else {
complete(null);
}
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, then are written to the partition/log, and finally, the response
// is put into the deferred event queue.
long prevLastWrittenOffset = context.coordinator.lastWrittenOffset();
LogConfig logConfig = partitionWriter.config(tp);
byte magic = logConfig.recordVersion().value;
int maxBatchSize = logConfig.maxMessageSize();
long currentTimeMs = time.milliseconds();
ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
// Append the records and replay them to the state machine.
context.append(
producerId,
producerEpoch,
verificationGuard,
result.records(),
result.replayRecords(),
this
);
try {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
compression,
TimestampType.CREATE_TIME,
0L,
currentTimeMs,
producerId,
producerEpoch,
0,
producerId != RecordBatch.NO_PRODUCER_ID,
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
maxBatchSize
);
// Apply the records to the state machine and add them to the batch.
for (int i = 0; i < result.records().size(); i++) {
U record = result.records().get(i);
if (result.replayRecords()) {
// We compute the offset of the record based on the last written offset. The
// coordinator is the single writer to the underlying partition so we can
// deduce it like this.
context.coordinator.replay(
prevLastWrittenOffset + i,
producerId,
producerEpoch,
record
);
}
byte[] keyBytes = serializer.serializeKey(record);
byte[] valBytes = serializer.serializeValue(record);
if (builder.hasRoomFor(currentTimeMs, keyBytes, valBytes, EMPTY_HEADERS)) {
builder.append(
currentTimeMs,
keyBytes,
valBytes,
EMPTY_HEADERS
);
} else {
throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() +
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + maxBatchSize + ".");
}
}
// Write the records to the log and update the last written
// offset.
long offset = partitionWriter.append(
tp,
verificationGuard,
builder.build()
);
context.coordinator.updateLastWrittenOffset(offset);
// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
} else {
complete(null);
}
} catch (Throwable t) {
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
complete(t);
} finally {
context.bufferSupplier.release(buffer);
}
// If the operation is not done, create an operation timeout.
if (!future.isDone()) {
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
}
});
} catch (Throwable t) {
@ -1142,40 +1535,17 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
public void run() {
try {
withActiveContextOrThrow(tp, context -> {
long prevLastWrittenOffset = context.coordinator.lastWrittenOffset();
context.completeTransaction(
producerId,
producerEpoch,
coordinatorEpoch,
result,
this
);
try {
context.coordinator.replayEndTransactionMarker(
producerId,
producerEpoch,
result
);
long offset = partitionWriter.append(
tp,
VerificationGuard.SENTINEL,
MemoryRecords.withEndTransactionMarker(
time.milliseconds(),
producerId,
producerEpoch,
new EndTransactionMarker(
result == TransactionResult.COMMIT ? ControlRecordType.COMMIT : ControlRecordType.ABORT,
coordinatorEpoch
)
)
);
context.coordinator.updateLastWrittenOffset(offset);
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
} else {
complete(null);
}
} catch (Throwable t) {
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
complete(t);
if (!future.isDone()) {
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
}
});
} catch (Throwable t) {
@ -1449,6 +1819,12 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
private final Compression compression;
/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
private final int appendLingerMs;
/**
* Atomic boolean indicating whether the runtime is running.
*/
@ -1475,7 +1851,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
* @param coordinatorMetrics The coordinator metrics.
* @param serializer The serializer.
* @param compression The compression codec.
* @param appendLingerMs The append linger time in ms.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private CoordinatorRuntime(
String logPrefix,
LogContext logContext,
@ -1489,7 +1867,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
CoordinatorRuntimeMetrics runtimeMetrics,
CoordinatorMetrics coordinatorMetrics,
Serializer<U> serializer,
Compression compression
Compression compression,
int appendLingerMs
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
@ -1506,6 +1885,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.coordinatorMetrics = coordinatorMetrics;
this.serializer = serializer;
this.compression = compression;
this.appendLingerMs = appendLingerMs;
}
/**
@ -1836,36 +2216,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
loader.load(tp, context.coordinator).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
CoordinatorContext ctx = coordinators.get(tp);
if (ctx != null) {
if (ctx.state != CoordinatorState.LOADING) {
log.info("Ignored load completion from {} because context is in {} state.",
ctx.tp, ctx.state);
return;
}
try {
if (exception != null) throw exception;
ctx.transitionTo(CoordinatorState.ACTIVE);
if (summary != null) {
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
log.info("Finished loading of metadata from {} with epoch {} in {}ms where {}ms " +
"was spent in the scheduler. Loaded {} records which total to {} bytes.",
tp, partitionEpoch, summary.endTimeMs() - summary.startTimeMs(),
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
}
} catch (Throwable ex) {
log.error("Failed to load metadata from {} with epoch {} due to {}.",
tp, partitionEpoch, ex.toString());
ctx.transitionTo(CoordinatorState.FAILED);
}
} else {
log.debug("Failed to complete the loading of metadata for {} in epoch {} since the coordinator does not exist.",
tp, partitionEpoch);
}
});
});
break;
case LOADING:

View File

@ -30,6 +30,7 @@ public class GroupCoordinatorConfigTest {
public void testConfigs() {
ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
GroupCoordinatorConfig config = new GroupCoordinatorConfig(
10,
10,
30,
10,
@ -65,6 +66,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
assertEquals(5000, config.offsetCommitTimeoutMs);
assertEquals(CompressionType.GZIP, config.compressionType);
assertEquals(10, config.appendLingerMs);
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
@ -74,6 +76,7 @@ public class GroupCoordinatorConfigTest {
) {
return new GroupCoordinatorConfig(
1,
10,
45,
5,
Integer.MAX_VALUE,

View File

@ -118,6 +118,7 @@ public class GroupCoordinatorServiceTest {
private GroupCoordinatorConfig createConfig() {
return new GroupCoordinatorConfig(
1,
10,
45,
5,
Integer.MAX_VALUE,

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
@ -60,8 +61,8 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -74,6 +75,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
@ -85,6 +87,8 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -147,7 +151,7 @@ public class CoordinatorRuntimeTest {
* when poll() is called.
*/
private static class ManualEventProcessor implements CoordinatorEventProcessor {
private Deque<CoordinatorEvent> queue = new LinkedList<>();
private final Deque<CoordinatorEvent> queue = new LinkedList<>();
@Override
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
@ -274,9 +278,72 @@ public class CoordinatorRuntimeTest {
* A simple Coordinator implementation that stores the records into a set.
*/
static class MockCoordinatorShard implements CoordinatorShard<String> {
static class RecordAndMetadata {
public final long offset;
public final long producerId;
public final short producerEpoch;
public final String record;
public RecordAndMetadata(
long offset,
String record
) {
this(
offset,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
record
);
}
public RecordAndMetadata(
long offset,
long producerId,
short producerEpoch,
String record
) {
this.offset = offset;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.record = record;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RecordAndMetadata that = (RecordAndMetadata) o;
if (offset != that.offset) return false;
if (producerId != that.producerId) return false;
if (producerEpoch != that.producerEpoch) return false;
return Objects.equals(record, that.record);
}
@Override
public int hashCode() {
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) producerEpoch;
result = 31 * result + (record != null ? record.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "RecordAndMetadata(" +
"offset=" + offset +
", producerId=" + producerId +
", producerEpoch=" + producerEpoch +
", record='" + record.substring(0, 10) + '\'' +
')';
}
}
private final SnapshotRegistry snapshotRegistry;
private final TimelineHashSet<String> records;
private final TimelineHashMap<Long, TimelineHashSet<String>> pendingRecords;
private final TimelineHashSet<RecordAndMetadata> records;
private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> pendingRecords;
private final CoordinatorTimer<Void, String> timer;
MockCoordinatorShard(
@ -296,12 +363,19 @@ public class CoordinatorRuntimeTest {
short producerEpoch,
String record
) throws RuntimeException {
RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
offset,
producerId,
producerEpoch,
record
);
if (producerId == RecordBatch.NO_PRODUCER_ID) {
records.add(record);
records.add(recordAndMetadata);
} else {
pendingRecords
.computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0))
.add(record);
.add(recordAndMetadata);
}
}
@ -312,7 +386,7 @@ public class CoordinatorRuntimeTest {
TransactionResult result
) throws RuntimeException {
if (result == TransactionResult.COMMIT) {
TimelineHashSet<String> pending = pendingRecords.remove(producerId);
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.remove(producerId);
if (pending == null) return;
records.addAll(pending);
} else {
@ -321,13 +395,26 @@ public class CoordinatorRuntimeTest {
}
Set<String> pendingRecords(long producerId) {
TimelineHashSet<String> pending = pendingRecords.get(producerId);
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.get(producerId);
if (pending == null) return Collections.emptySet();
return Collections.unmodifiableSet(new HashSet<>(pending));
return Collections.unmodifiableSet(
pending.stream().map(record -> record.record).collect(Collectors.toSet())
);
}
Set<String> records() {
return Collections.unmodifiableSet(new HashSet<>(records));
return Collections.unmodifiableSet(
records.stream().map(record -> record.record).collect(Collectors.toSet())
);
}
List<RecordAndMetadata> fullRecords() {
return Collections.unmodifiableList(
records
.stream()
.sorted(Comparator.comparingLong(record -> record.offset))
.collect(Collectors.toList())
);
}
CoordinatorTimer<Void, String> timer() {
@ -407,10 +494,17 @@ public class CoordinatorRuntimeTest {
long timestamp,
String... records
) {
if (records.length == 0)
return records(timestamp, Arrays.stream(records).collect(Collectors.toList()));
}
private static MemoryRecords records(
long timestamp,
List<String> records
) {
if (records.isEmpty())
return MemoryRecords.EMPTY;
List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))
).collect(Collectors.toList());
@ -447,10 +541,24 @@ public class CoordinatorRuntimeTest {
long timestamp,
String... records
) {
if (records.length == 0)
return transactionalRecords(
producerId,
producerEpoch,
timestamp,
Arrays.stream(records).collect(Collectors.toList())
);
}
private static MemoryRecords transactionalRecords(
long producerId,
short producerEpoch,
long timestamp,
List<String> records
) {
if (records.isEmpty())
return MemoryRecords.EMPTY;
List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))
).collect(Collectors.toList());
@ -986,13 +1094,13 @@ public class CoordinatorRuntimeTest {
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(Collections.singletonList(
records(timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record3"), "response2"));
state -> new CoordinatorResult<>(Collections.singletonList("record3"), "response2"));
// Verify that the write is not committed yet.
assertFalse(write2.isDone());
@ -1540,7 +1648,7 @@ public class CoordinatorRuntimeTest {
100L
));
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -1785,7 +1893,7 @@ public class CoordinatorRuntimeTest {
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -1807,7 +1915,7 @@ public class CoordinatorRuntimeTest {
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
}
@ -1985,7 +2093,7 @@ public class CoordinatorRuntimeTest {
// Read.
List<CompletableFuture<List<String>>> responses = runtime.scheduleReadAllOperation(
"read",
(state, offset) -> new ArrayList<>(state.records)
(state, offset) -> new ArrayList<>(state.records())
);
assertEquals(
@ -3059,6 +3167,594 @@ public class CoordinatorRuntimeTest {
assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
}
@Test
public void testScheduleWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1 with two records.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(0, 2), "response1")
);
// Verify that the write is not committed yet.
assertFalse(write1.isDone());
// A batch has been created.
assertNotNull(ctx.currentBatch);
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
// Write #2 with one record.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(2, 3), "response2")
);
// Verify that the write is not committed yet.
assertFalse(write2.isDone());
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
// Write #3 with one record. This one cannot go into the existing batch
// so the existing batch should be flushed and a new one should be created.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(3, 4), "response3")
);
// Verify that the write is not committed yet.
assertFalse(write3.isDone());
// Verify the state. Records are replayed. The previous batch
// got flushed with all the records but the new one from #3.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.singletonList(
records(timer.time().milliseconds(), records.subList(0, 3))
), writer.entries(TP));
// Advance past the linger time.
timer.advanceClock(11);
// Verify the state. The pending batch is flushed.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Arrays.asList(
records(timer.time().milliseconds() - 11, records.subList(0, 3)),
records(timer.time().milliseconds() - 11, records.subList(3, 4))
), writer.entries(TP));
// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertTrue(write3.isDone());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
}
@Test
public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write all the records.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records, "response1")
);
assertFutureThrows(write, RecordTooLargeException.class);
}
@Test
public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
MockTimer timer = new MockTimer();
// The partition writer only accept no writes.
MockPartitionWriter writer = new MockPartitionWriter(0);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(0, 1), "response1"));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(1, 2), "response2"));
// Write #3.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(2, 3), "response3"));
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
// Write #4. This write cannot make it in the current batch. So the current batch
// is flushed. It will fail. So we expect all writes to fail.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(3, 4), "response4"));
// Verify the futures.
assertFutureThrows(write1, KafkaException.class);
assertFutureThrows(write2, KafkaException.class);
assertFutureThrows(write3, KafkaException.class);
// Write #4 is also expected to fail.
assertFutureThrows(write4, KafkaException.class);
// Verify the state. The state should be reverted to the initial state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@Test
public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Override the coordinator with a coordinator that throws
// an exception when replay is called.
SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
ctx.coordinator = new SnapshottableCoordinator<>(
new LogContext(),
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
@Override
public void replay(
long offset,
long producerId,
short producerEpoch,
String record
) throws RuntimeException {
if (offset >= 1) {
throw new IllegalArgumentException("error");
}
super.replay(
offset,
producerId,
producerEpoch,
record
);
}
},
TP
);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(0, 1), "response1"));
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.singletonList(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
// Write #2. It should fail.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(1, 2), "response2"));
// Verify the futures.
assertFutureThrows(write1, IllegalArgumentException.class);
assertFutureThrows(write2, IllegalArgumentException.class);
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@Test
public void testScheduleTransactionalWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Write #1 with one record.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
);
// Verify that the write is not committed yet.
assertFalse(write1.isDone());
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(mkSet("record#1"), ctx.coordinator.coordinator().records());
assertEquals(Collections.emptyList(), writer.entries(TP));
// Transactional write #2 with one record. This will flush the current batch.
CompletableFuture<String> write2 = runtime.scheduleTransactionalWriteOperation(
"txn-write#1",
TP,
"transactional-id",
100L,
(short) 50,
Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
// Verify that the write is not committed yet.
assertFalse(write2.isDone());
// Verify the state. The current batch and the transactional records are
// written to the log.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record#2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(mkSet("record#1"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2")
), writer.entries(TP));
// Write #3 with one record.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
);
// Verify that the write is not committed yet.
assertFalse(write3.isDone());
// Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record#2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(mkSet("record#1", "record#3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2")
), writer.entries(TP));
// Complete transaction #1. It will flush the current batch if any.
CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
"complete#1",
TP,
100L,
(short) 50,
10,
TransactionResult.COMMIT,
DEFAULT_WRITE_TIMEOUT
);
// Verify that the completion is not committed yet.
assertFalse(complete1.isDone());
// Verify the state.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(mkSet("record#1", "record#2", "record#3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2"),
records(timer.time().milliseconds(), "record#3"),
endTransactionMarker(100L, (short) 50, timer.time().milliseconds(), 10, ControlRecordType.COMMIT)
), writer.entries(TP));
// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertTrue(write3.isDone());
assertTrue(complete1.isDone());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
assertNull(complete1.get(5, TimeUnit.SECONDS));
}
@Test
public void testStateMachineIsReloadedWhenOutOfSync() {
MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
MockPartitionWriter writer = new MockPartitionWriter() {
@Override
public long append(
TopicPartition tp,
VerificationGuard verificationGuard,
MemoryRecords batch
) {
// Add 1 to the returned offsets.
return super.append(tp, verificationGuard, batch) + 1;
}
};
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(loader)
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(ACTIVE, ctx.state);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Keep a reference to the current coordinator.
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = ctx.coordinator;
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(0, 1), "response1"));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(1, 2), "response2"));
// Write #3.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(2, 3), "response3"));
// Write #4. This write cannot make it in the current batch. So the current batch
// is flushed. It will fail. So we expect all writes to fail.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(3, 4), "response4"));
// Verify the futures.
assertFutureThrows(write1, NotCoordinatorException.class);
assertFutureThrows(write2, NotCoordinatorException.class);
assertFutureThrows(write3, NotCoordinatorException.class);
// Write #4 is also expected to fail.
assertFutureThrows(write4, NotCoordinatorException.class);
// Verify that the state machine was loaded twice.
verify(loader, times(2)).load(eq(TP), any());
// Verify that the state is active and that the state machine
// is actually a new one.
assertEquals(ACTIVE, ctx.state);
assertNotEquals(coordinator, ctx.coordinator);
}
private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp

View File

@ -32,6 +32,10 @@ public abstract class TimerTask implements Runnable {
}
}
public boolean isCancelled() {
return timerTaskEntry == null;
}
final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task entry,