MINOR: Move inner test classes out of CoordinatorRuntimeTest (#19258)

Some of these classes are generally useful for testing.
MockCoordinatorShard is already shared by SnapshottableCoordinatorTest.

Also do some minor refactors.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-03-21 15:36:23 +00:00 committed by GitHub
parent ca20e9cd92
commit ec12d360a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 598 additions and 443 deletions

View File

@ -36,7 +36,6 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
@ -46,8 +45,6 @@ import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard; import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -60,18 +57,13 @@ import java.nio.charset.Charset;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -111,437 +103,6 @@ public class CoordinatorRuntimeTest {
private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
private static class StringSerializer implements Serializer<String> {
@Override
public byte[] serializeKey(String record) {
return null;
}
@Override
public byte[] serializeValue(String record) {
return record.getBytes(Charset.defaultCharset());
}
}
private static class ThrowingSerializer<T> implements Serializer<T> {
private final Serializer<T> serializer;
private boolean throwOnNextOperation;
public ThrowingSerializer(Serializer<T> serializer) {
this.serializer = serializer;
this.throwOnNextOperation = false;
}
public void throwOnNextOperation() {
throwOnNextOperation = true;
}
@Override
public byte[] serializeKey(T record) {
return serializer.serializeKey(record);
}
@Override
public byte[] serializeValue(T record) {
if (throwOnNextOperation) {
throwOnNextOperation = false;
throw new BufferOverflowException();
}
return serializer.serializeValue(record);
}
}
/**
* A CoordinatorEventProcessor that directly executes the operations. This is
* useful in unit tests where execution in threads is not required.
*/
private static class DirectEventProcessor implements CoordinatorEventProcessor {
@Override
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
}
@Override
public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
}
@Override
public void close() {}
}
/**
* A CoordinatorEventProcessor that queues event and execute the next one
* when poll() is called.
*/
private static class ManualEventProcessor implements CoordinatorEventProcessor {
private final Deque<CoordinatorEvent> queue = new LinkedList<>();
@Override
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
queue.addLast(event);
}
@Override
public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
queue.addFirst(event);
}
public boolean poll() {
CoordinatorEvent event = queue.poll();
if (event == null) return false;
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
return true;
}
public int size() {
return queue.size();
}
@Override
public void close() {
}
}
/**
* A CoordinatorLoader that always succeeds.
*/
private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
private final LoadSummary summary;
private final List<Long> lastWrittenOffsets;
private final List<Long> lastCommittedOffsets;
public MockCoordinatorLoader(
LoadSummary summary,
List<Long> lastWrittenOffsets,
List<Long> lastCommittedOffsets
) {
this.summary = summary;
this.lastWrittenOffsets = lastWrittenOffsets;
this.lastCommittedOffsets = lastCommittedOffsets;
}
public MockCoordinatorLoader() {
this(null, List.of(), List.of());
}
@Override
public CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<String> replayable
) {
lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
return CompletableFuture.completedFuture(summary);
}
@Override
public void close() { }
}
/**
* An in-memory partition writer that accepts a maximum number of writes.
*/
private static class MockPartitionWriter extends InMemoryPartitionWriter {
private final Time time;
private final int maxWrites;
private final boolean failEndMarker;
private final AtomicInteger writeCount = new AtomicInteger(0);
public MockPartitionWriter() {
this(new MockTime(), Integer.MAX_VALUE, false);
}
public MockPartitionWriter(int maxWrites) {
this(new MockTime(), maxWrites, false);
}
public MockPartitionWriter(boolean failEndMarker) {
this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
}
public MockPartitionWriter(Time time, int maxWrites, boolean failEndMarker) {
super(false);
this.time = time;
this.maxWrites = maxWrites;
this.failEndMarker = failEndMarker;
}
@Override
public void registerListener(TopicPartition tp, Listener listener) {
super.registerListener(tp, listener);
}
@Override
public void deregisterListener(TopicPartition tp, Listener listener) {
super.deregisterListener(tp, listener);
}
@Override
public long append(
TopicPartition tp,
VerificationGuard verificationGuard,
MemoryRecords batch
) {
if (batch.sizeInBytes() > config(tp).maxMessageSize())
throw new RecordTooLargeException("Batch is larger than the max message size");
// We don't want the coordinator to write empty batches.
if (batch.validBytes() <= 0)
throw new KafkaException("Coordinator tried to write an empty batch");
if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");
if (failEndMarker && batch.firstBatch().isControlBatch())
throw new KafkaException("Couldn't write end marker.");
time.sleep(10);
return super.append(tp, verificationGuard, batch);
}
}
/**
* A simple Coordinator implementation that stores the records into a set.
*/
static class MockCoordinatorShard implements CoordinatorShard<String> {
static class RecordAndMetadata {
public final long offset;
public final long producerId;
public final short producerEpoch;
public final String record;
public RecordAndMetadata(
long offset,
String record
) {
this(
offset,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
record
);
}
public RecordAndMetadata(
long offset,
long producerId,
short producerEpoch,
String record
) {
this.offset = offset;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.record = record;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RecordAndMetadata that = (RecordAndMetadata) o;
if (offset != that.offset) return false;
if (producerId != that.producerId) return false;
if (producerEpoch != that.producerEpoch) return false;
return Objects.equals(record, that.record);
}
@Override
public int hashCode() {
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) producerEpoch;
result = 31 * result + (record != null ? record.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "RecordAndMetadata(" +
"offset=" + offset +
", producerId=" + producerId +
", producerEpoch=" + producerEpoch +
", record='" + record.substring(0, Math.min(10, record.length())) + '\'' +
')';
}
}
private final SnapshotRegistry snapshotRegistry;
private final TimelineHashSet<RecordAndMetadata> records;
private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> pendingRecords;
private final CoordinatorTimer<Void, String> timer;
private final CoordinatorExecutor<String> executor;
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer
) {
this(snapshotRegistry, timer, null);
}
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer,
CoordinatorExecutor<String> executor
) {
this.snapshotRegistry = snapshotRegistry;
this.records = new TimelineHashSet<>(snapshotRegistry, 0);
this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
this.timer = timer;
this.executor = executor;
}
@Override
public void replay(
long offset,
long producerId,
short producerEpoch,
String record
) throws RuntimeException {
RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
offset,
producerId,
producerEpoch,
record
);
if (producerId == RecordBatch.NO_PRODUCER_ID) {
records.add(recordAndMetadata);
} else {
pendingRecords
.computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0))
.add(recordAndMetadata);
}
}
@Override
public void replayEndTransactionMarker(
long producerId,
short producerEpoch,
TransactionResult result
) throws RuntimeException {
if (result == TransactionResult.COMMIT) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.remove(producerId);
if (pending == null) return;
records.addAll(pending);
} else {
pendingRecords.remove(producerId);
}
}
Set<String> pendingRecords(long producerId) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.get(producerId);
if (pending == null) return Set.of();
return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
Set<String> records() {
return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
List<RecordAndMetadata> fullRecords() {
return records
.stream()
.sorted(Comparator.comparingLong(record -> record.offset))
.collect(Collectors.toList());
}
}
/**
* A CoordinatorBuilder that creates a MockCoordinator.
*/
private static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
private SnapshotRegistry snapshotRegistry;
private CoordinatorTimer<Void, String> timer;
private CoordinatorExecutor<String> executor;
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry
) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(
LogContext logContext
) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
Time time
) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withExecutor(
CoordinatorExecutor<String> executor
) {
this.executor = executor;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
CoordinatorTimer<Void, String> timer
) {
this.timer = timer;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(
TopicPartition topicPartition
) {
return this;
}
@Override
public MockCoordinatorShard build() {
return new MockCoordinatorShard(
Objects.requireNonNull(this.snapshotRegistry),
Objects.requireNonNull(this.timer),
Objects.requireNonNull(this.executor)
);
}
}
/**
* A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
*/
private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
return new MockCoordinatorShardBuilder();
}
}
private static MemoryRecords records( private static MemoryRecords records(
long timestamp, long timestamp,
String... records String... records
@ -4917,7 +4478,7 @@ public class CoordinatorRuntimeTest {
// Schedule a write which schedules an async tasks. // Schedule a write which schedules an async tasks.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
state -> { state -> {
state.executor.schedule( state.executor().schedule(
"write#1#task", "write#1#task",
() -> "task result", () -> "task result",
(result, exception) -> { (result, exception) -> {
@ -4935,7 +4496,7 @@ public class CoordinatorRuntimeTest {
// We should have a new write event in the queue as a result of the // We should have a new write event in the queue as a result of the
// task being executed immediately. // task being executed immediately.
assertEquals(1, processor.queue.size()); assertEquals(1, processor.size());
// Verify the state. // Verify the state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
@ -4949,7 +4510,7 @@ public class CoordinatorRuntimeTest {
processor.poll(); processor.poll();
// The processor must be empty now. // The processor must be empty now.
assertEquals(0, processor.queue.size()); assertEquals(0, processor.size());
// Verify the state. // Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(2L, ctx.coordinator.lastWrittenOffset());

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import java.util.concurrent.RejectedExecutionException;
/**
* A CoordinatorEventProcessor that directly executes the operations. This is
* useful in unit tests where execution in threads is not required.
*/
public class DirectEventProcessor implements CoordinatorEventProcessor {
@Override
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
}
@Override
public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
}
@Override
public void close() {}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.RejectedExecutionException;
/**
* A CoordinatorEventProcessor that queues events and execute the next one
* when poll() is called.
*/
public class ManualEventProcessor implements CoordinatorEventProcessor {
private final Deque<CoordinatorEvent> queue = new LinkedList<>();
@Override
public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException {
queue.addLast(event);
}
@Override
public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException {
queue.addFirst(event);
}
public boolean poll() {
CoordinatorEvent event = queue.poll();
if (event == null) return false;
try {
event.run();
} catch (Throwable ex) {
event.complete(ex);
}
return true;
}
public int size() {
return queue.size();
}
@Override
public void close() {}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* A CoordinatorLoader that always succeeds.
*/
public class MockCoordinatorLoader implements CoordinatorLoader<String> {
private final LoadSummary summary;
private final List<Long> lastWrittenOffsets;
private final List<Long> lastCommittedOffsets;
public MockCoordinatorLoader(
LoadSummary summary,
List<Long> lastWrittenOffsets,
List<Long> lastCommittedOffsets
) {
this.summary = summary;
this.lastWrittenOffsets = lastWrittenOffsets;
this.lastCommittedOffsets = lastCommittedOffsets;
}
public MockCoordinatorLoader() {
this(null, List.of(), List.of());
}
@Override
public CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<String> replayable
) {
lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
return CompletableFuture.completedFuture(summary);
}
@Override
public void close() {}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A simple Coordinator implementation that stores the records into a set.
*/
public class MockCoordinatorShard implements CoordinatorShard<String> {
static record RecordAndMetadata(
long offset,
long producerId,
short producerEpoch,
String record
) {
public RecordAndMetadata(
long offset,
String record
) {
this(
offset,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
record
);
}
}
private final SnapshotRegistry snapshotRegistry;
private final TimelineHashSet<RecordAndMetadata> records;
private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>> pendingRecords;
private final CoordinatorTimer<Void, String> timer;
private final CoordinatorExecutor<String> executor;
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer
) {
this(snapshotRegistry, timer, null);
}
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
CoordinatorTimer<Void, String> timer,
CoordinatorExecutor<String> executor
) {
this.snapshotRegistry = snapshotRegistry;
this.records = new TimelineHashSet<>(snapshotRegistry, 0);
this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0);
this.timer = timer;
this.executor = executor;
}
@Override
public void replay(
long offset,
long producerId,
short producerEpoch,
String record
) throws RuntimeException {
RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
offset,
producerId,
producerEpoch,
record
);
if (producerId == RecordBatch.NO_PRODUCER_ID) {
records.add(recordAndMetadata);
} else {
pendingRecords
.computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0))
.add(recordAndMetadata);
}
}
@Override
public void replayEndTransactionMarker(
long producerId,
short producerEpoch,
TransactionResult result
) throws RuntimeException {
if (result == TransactionResult.COMMIT) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.remove(producerId);
if (pending == null) return;
records.addAll(pending);
} else {
pendingRecords.remove(producerId);
}
}
Set<String> pendingRecords(long producerId) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.get(producerId);
if (pending == null) return Set.of();
return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
Set<String> records() {
return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
List<RecordAndMetadata> fullRecords() {
return records
.stream()
.sorted(Comparator.comparingLong(record -> record.offset))
.toList();
}
CoordinatorTimer<Void, String> timer() {
return timer;
}
CoordinatorExecutor<String> executor() {
return executor;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.Objects;
/**
* A CoordinatorBuilder that creates a MockCoordinator.
*/
public class MockCoordinatorShardBuilder implements CoordinatorShardBuilder<MockCoordinatorShard, String> {
private SnapshotRegistry snapshotRegistry;
private CoordinatorTimer<Void, String> timer;
private CoordinatorExecutor<String> executor;
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry
) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withLogContext(
LogContext logContext
) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTime(
Time time
) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withExecutor(
CoordinatorExecutor<String> executor
) {
this.executor = executor;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
CoordinatorTimer<Void, String> timer
) {
this.timer = timer;
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(
TopicPartition topicPartition
) {
return this;
}
@Override
public MockCoordinatorShard build() {
return new MockCoordinatorShard(
Objects.requireNonNull(this.snapshotRegistry),
Objects.requireNonNull(this.timer),
Objects.requireNonNull(this.executor)
);
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
/**
* A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder.
*/
public class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier<MockCoordinatorShard, String> {
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
return new MockCoordinatorShardBuilder();
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An in-memory partition writer that accepts a maximum number of writes.
*/
public class MockPartitionWriter extends InMemoryPartitionWriter {
private final Time time;
private final int maxWrites;
private final boolean failEndMarker;
private final AtomicInteger writeCount = new AtomicInteger(0);
public MockPartitionWriter() {
this(new MockTime(), Integer.MAX_VALUE, false);
}
public MockPartitionWriter(int maxWrites) {
this(new MockTime(), maxWrites, false);
}
public MockPartitionWriter(boolean failEndMarker) {
this(new MockTime(), Integer.MAX_VALUE, failEndMarker);
}
public MockPartitionWriter(Time time, int maxWrites, boolean failEndMarker) {
super(false);
this.time = time;
this.maxWrites = maxWrites;
this.failEndMarker = failEndMarker;
}
@Override
public void registerListener(TopicPartition tp, Listener listener) {
super.registerListener(tp, listener);
}
@Override
public void deregisterListener(TopicPartition tp, Listener listener) {
super.deregisterListener(tp, listener);
}
@Override
public long append(
TopicPartition tp,
VerificationGuard verificationGuard,
MemoryRecords batch
) {
if (batch.sizeInBytes() > config(tp).maxMessageSize())
throw new RecordTooLargeException("Batch is larger than the max message size");
// We don't want the coordinator to write empty batches.
if (batch.validBytes() <= 0)
throw new KafkaException("Coordinator tried to write an empty batch");
if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");
if (failEndMarker && batch.firstBatch().isControlBatch())
throw new KafkaException("Couldn't write end marker.");
time.sleep(10);
return super.append(tp, verificationGuard, batch);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeTest.MockCoordinatorShard;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import java.nio.charset.Charset;
/**
* A Serializer for strings.
*/
public class StringSerializer implements Serializer<String> {
@Override
public byte[] serializeKey(String record) {
return null;
}
@Override
public byte[] serializeValue(String record) {
return record.getBytes(Charset.defaultCharset());
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import java.nio.BufferOverflowException;
/**
* A Serializer that throws exceptions on demand.
*/
public class ThrowingSerializer<T> implements Serializer<T> {
private final Serializer<T> serializer;
private boolean throwOnNextOperation;
public ThrowingSerializer(Serializer<T> serializer) {
this.serializer = serializer;
this.throwOnNextOperation = false;
}
public void throwOnNextOperation() {
throwOnNextOperation = true;
}
@Override
public byte[] serializeKey(T record) {
return serializer.serializeKey(record);
}
@Override
public byte[] serializeValue(T record) {
if (throwOnNextOperation) {
throwOnNextOperation = false;
throw new BufferOverflowException();
}
return serializer.serializeValue(record);
}
}