diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 72f31b0a7d8..fb5b4a68572 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -46,8 +45,6 @@ import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.VerificationGuard; import org.apache.kafka.timeline.SnapshotRegistry; -import org.apache.kafka.timeline.TimelineHashMap; -import org.apache.kafka.timeline.TimelineHashSet; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -60,18 +57,13 @@ import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -111,437 +103,6 @@ public class CoordinatorRuntimeTest { private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); - private static class StringSerializer implements Serializer { - @Override - public byte[] serializeKey(String record) { - return null; - } - - @Override - public byte[] serializeValue(String record) { - return record.getBytes(Charset.defaultCharset()); - } - } - - private static class ThrowingSerializer implements Serializer { - private final Serializer serializer; - private boolean throwOnNextOperation; - - public ThrowingSerializer(Serializer serializer) { - this.serializer = serializer; - this.throwOnNextOperation = false; - } - - public void throwOnNextOperation() { - throwOnNextOperation = true; - } - - @Override - public byte[] serializeKey(T record) { - return serializer.serializeKey(record); - } - - @Override - public byte[] serializeValue(T record) { - if (throwOnNextOperation) { - throwOnNextOperation = false; - throw new BufferOverflowException(); - } - return serializer.serializeValue(record); - } - } - - /** - * A CoordinatorEventProcessor that directly executes the operations. This is - * useful in unit tests where execution in threads is not required. - */ - private static class DirectEventProcessor implements CoordinatorEventProcessor { - @Override - public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { - try { - event.run(); - } catch (Throwable ex) { - event.complete(ex); - } - } - - @Override - public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException { - try { - event.run(); - } catch (Throwable ex) { - event.complete(ex); - } - } - - @Override - public void close() {} - } - - /** - * A CoordinatorEventProcessor that queues event and execute the next one - * when poll() is called. - */ - private static class ManualEventProcessor implements CoordinatorEventProcessor { - private final Deque queue = new LinkedList<>(); - - @Override - public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { - queue.addLast(event); - } - - @Override - public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException { - queue.addFirst(event); - } - - public boolean poll() { - CoordinatorEvent event = queue.poll(); - if (event == null) return false; - - try { - event.run(); - } catch (Throwable ex) { - event.complete(ex); - } - - return true; - } - - public int size() { - return queue.size(); - } - - @Override - public void close() { - - } - } - - /** - * A CoordinatorLoader that always succeeds. - */ - private static class MockCoordinatorLoader implements CoordinatorLoader { - private final LoadSummary summary; - private final List lastWrittenOffsets; - private final List lastCommittedOffsets; - - public MockCoordinatorLoader( - LoadSummary summary, - List lastWrittenOffsets, - List lastCommittedOffsets - ) { - this.summary = summary; - this.lastWrittenOffsets = lastWrittenOffsets; - this.lastCommittedOffsets = lastCommittedOffsets; - } - - public MockCoordinatorLoader() { - this(null, List.of(), List.of()); - } - - @Override - public CompletableFuture load( - TopicPartition tp, - CoordinatorPlayback replayable - ) { - lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset); - lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset); - return CompletableFuture.completedFuture(summary); - } - - @Override - public void close() { } - } - - /** - * An in-memory partition writer that accepts a maximum number of writes. - */ - private static class MockPartitionWriter extends InMemoryPartitionWriter { - private final Time time; - private final int maxWrites; - private final boolean failEndMarker; - private final AtomicInteger writeCount = new AtomicInteger(0); - - public MockPartitionWriter() { - this(new MockTime(), Integer.MAX_VALUE, false); - } - - public MockPartitionWriter(int maxWrites) { - this(new MockTime(), maxWrites, false); - } - - public MockPartitionWriter(boolean failEndMarker) { - this(new MockTime(), Integer.MAX_VALUE, failEndMarker); - } - - public MockPartitionWriter(Time time, int maxWrites, boolean failEndMarker) { - super(false); - this.time = time; - this.maxWrites = maxWrites; - this.failEndMarker = failEndMarker; - } - - @Override - public void registerListener(TopicPartition tp, Listener listener) { - super.registerListener(tp, listener); - } - - @Override - public void deregisterListener(TopicPartition tp, Listener listener) { - super.deregisterListener(tp, listener); - } - - @Override - public long append( - TopicPartition tp, - VerificationGuard verificationGuard, - MemoryRecords batch - ) { - if (batch.sizeInBytes() > config(tp).maxMessageSize()) - throw new RecordTooLargeException("Batch is larger than the max message size"); - - // We don't want the coordinator to write empty batches. - if (batch.validBytes() <= 0) - throw new KafkaException("Coordinator tried to write an empty batch"); - - if (writeCount.incrementAndGet() > maxWrites) - throw new KafkaException("Maximum number of writes reached"); - - if (failEndMarker && batch.firstBatch().isControlBatch()) - throw new KafkaException("Couldn't write end marker."); - - time.sleep(10); - return super.append(tp, verificationGuard, batch); - } - } - - /** - * A simple Coordinator implementation that stores the records into a set. - */ - static class MockCoordinatorShard implements CoordinatorShard { - static class RecordAndMetadata { - public final long offset; - public final long producerId; - public final short producerEpoch; - public final String record; - - public RecordAndMetadata( - long offset, - String record - ) { - this( - offset, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - record - ); - } - - public RecordAndMetadata( - long offset, - long producerId, - short producerEpoch, - String record - ) { - this.offset = offset; - this.producerId = producerId; - this.producerEpoch = producerEpoch; - this.record = record; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - RecordAndMetadata that = (RecordAndMetadata) o; - - if (offset != that.offset) return false; - if (producerId != that.producerId) return false; - if (producerEpoch != that.producerEpoch) return false; - return Objects.equals(record, that.record); - } - - @Override - public int hashCode() { - int result = (int) (offset ^ (offset >>> 32)); - result = 31 * result + (int) (producerId ^ (producerId >>> 32)); - result = 31 * result + (int) producerEpoch; - result = 31 * result + (record != null ? record.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "RecordAndMetadata(" + - "offset=" + offset + - ", producerId=" + producerId + - ", producerEpoch=" + producerEpoch + - ", record='" + record.substring(0, Math.min(10, record.length())) + '\'' + - ')'; - } - } - - private final SnapshotRegistry snapshotRegistry; - private final TimelineHashSet records; - private final TimelineHashMap> pendingRecords; - private final CoordinatorTimer timer; - private final CoordinatorExecutor executor; - - MockCoordinatorShard( - SnapshotRegistry snapshotRegistry, - CoordinatorTimer timer - ) { - this(snapshotRegistry, timer, null); - } - - MockCoordinatorShard( - SnapshotRegistry snapshotRegistry, - CoordinatorTimer timer, - CoordinatorExecutor executor - ) { - this.snapshotRegistry = snapshotRegistry; - this.records = new TimelineHashSet<>(snapshotRegistry, 0); - this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0); - this.timer = timer; - this.executor = executor; - } - - @Override - public void replay( - long offset, - long producerId, - short producerEpoch, - String record - ) throws RuntimeException { - RecordAndMetadata recordAndMetadata = new RecordAndMetadata( - offset, - producerId, - producerEpoch, - record - ); - - if (producerId == RecordBatch.NO_PRODUCER_ID) { - records.add(recordAndMetadata); - } else { - pendingRecords - .computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0)) - .add(recordAndMetadata); - } - } - - @Override - public void replayEndTransactionMarker( - long producerId, - short producerEpoch, - TransactionResult result - ) throws RuntimeException { - if (result == TransactionResult.COMMIT) { - TimelineHashSet pending = pendingRecords.remove(producerId); - if (pending == null) return; - records.addAll(pending); - } else { - pendingRecords.remove(producerId); - } - } - - Set pendingRecords(long producerId) { - TimelineHashSet pending = pendingRecords.get(producerId); - if (pending == null) return Set.of(); - return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); - } - - Set records() { - return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); - } - - List fullRecords() { - return records - .stream() - .sorted(Comparator.comparingLong(record -> record.offset)) - .collect(Collectors.toList()); - } - } - - /** - * A CoordinatorBuilder that creates a MockCoordinator. - */ - private static class MockCoordinatorShardBuilder implements CoordinatorShardBuilder { - private SnapshotRegistry snapshotRegistry; - private CoordinatorTimer timer; - private CoordinatorExecutor executor; - - @Override - public CoordinatorShardBuilder withSnapshotRegistry( - SnapshotRegistry snapshotRegistry - ) { - this.snapshotRegistry = snapshotRegistry; - return this; - } - - @Override - public CoordinatorShardBuilder withLogContext( - LogContext logContext - ) { - return this; - } - - @Override - public CoordinatorShardBuilder withTime( - Time time - ) { - return this; - } - - @Override - public CoordinatorShardBuilder withExecutor( - CoordinatorExecutor executor - ) { - this.executor = executor; - return this; - } - - @Override - public CoordinatorShardBuilder withTimer( - CoordinatorTimer timer - ) { - this.timer = timer; - return this; - } - - @Override - public CoordinatorShardBuilder withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) { - return this; - } - - @Override - public CoordinatorShardBuilder withTopicPartition( - TopicPartition topicPartition - ) { - return this; - } - - @Override - public MockCoordinatorShard build() { - return new MockCoordinatorShard( - Objects.requireNonNull(this.snapshotRegistry), - Objects.requireNonNull(this.timer), - Objects.requireNonNull(this.executor) - ); - } - } - - /** - * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder. - */ - private static class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier { - @Override - public CoordinatorShardBuilder get() { - return new MockCoordinatorShardBuilder(); - } - } - private static MemoryRecords records( long timestamp, String... records @@ -4917,7 +4478,7 @@ public class CoordinatorRuntimeTest { // Schedule a write which schedules an async tasks. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, state -> { - state.executor.schedule( + state.executor().schedule( "write#1#task", () -> "task result", (result, exception) -> { @@ -4935,7 +4496,7 @@ public class CoordinatorRuntimeTest { // We should have a new write event in the queue as a result of the // task being executed immediately. - assertEquals(1, processor.queue.size()); + assertEquals(1, processor.size()); // Verify the state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); @@ -4949,7 +4510,7 @@ public class CoordinatorRuntimeTest { processor.poll(); // The processor must be empty now. - assertEquals(0, processor.queue.size()); + assertEquals(0, processor.size()); // Verify the state. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java new file mode 100644 index 00000000000..60b74c3a12f --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.util.concurrent.RejectedExecutionException; + +/** + * A CoordinatorEventProcessor that directly executes the operations. This is + * useful in unit tests where execution in threads is not required. + */ +public class DirectEventProcessor implements CoordinatorEventProcessor { + @Override + public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { + try { + event.run(); + } catch (Throwable ex) { + event.complete(ex); + } + } + + @Override + public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException { + try { + event.run(); + } catch (Throwable ex) { + event.complete(ex); + } + } + + @Override + public void close() {} +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java new file mode 100644 index 00000000000..0a55bcfafa5 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ManualEventProcessor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.RejectedExecutionException; + +/** + * A CoordinatorEventProcessor that queues events and execute the next one + * when poll() is called. + */ +public class ManualEventProcessor implements CoordinatorEventProcessor { + private final Deque queue = new LinkedList<>(); + + @Override + public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { + queue.addLast(event); + } + + @Override + public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException { + queue.addFirst(event); + } + + public boolean poll() { + CoordinatorEvent event = queue.poll(); + if (event == null) return false; + + try { + event.run(); + } catch (Throwable ex) { + event.complete(ex); + } + + return true; + } + + public int size() { + return queue.size(); + } + + @Override + public void close() {} +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java new file mode 100644 index 00000000000..54fae743274 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorLoader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.TopicPartition; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * A CoordinatorLoader that always succeeds. + */ +public class MockCoordinatorLoader implements CoordinatorLoader { + private final LoadSummary summary; + private final List lastWrittenOffsets; + private final List lastCommittedOffsets; + + public MockCoordinatorLoader( + LoadSummary summary, + List lastWrittenOffsets, + List lastCommittedOffsets + ) { + this.summary = summary; + this.lastWrittenOffsets = lastWrittenOffsets; + this.lastCommittedOffsets = lastCommittedOffsets; + } + + public MockCoordinatorLoader() { + this(null, List.of(), List.of()); + } + + @Override + public CompletableFuture load( + TopicPartition tp, + CoordinatorPlayback replayable + ) { + lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset); + lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset); + return CompletableFuture.completedFuture(summary); + } + + @Override + public void close() {} +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java new file mode 100644 index 00000000000..1fec7a9e000 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; + +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +public class MockCoordinatorShard implements CoordinatorShard { + static record RecordAndMetadata( + long offset, + long producerId, + short producerEpoch, + String record + ) { + public RecordAndMetadata( + long offset, + String record + ) { + this( + offset, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + record + ); + } + } + + private final SnapshotRegistry snapshotRegistry; + private final TimelineHashSet records; + private final TimelineHashMap> pendingRecords; + private final CoordinatorTimer timer; + private final CoordinatorExecutor executor; + + MockCoordinatorShard( + SnapshotRegistry snapshotRegistry, + CoordinatorTimer timer + ) { + this(snapshotRegistry, timer, null); + } + + MockCoordinatorShard( + SnapshotRegistry snapshotRegistry, + CoordinatorTimer timer, + CoordinatorExecutor executor + ) { + this.snapshotRegistry = snapshotRegistry; + this.records = new TimelineHashSet<>(snapshotRegistry, 0); + this.pendingRecords = new TimelineHashMap<>(snapshotRegistry, 0); + this.timer = timer; + this.executor = executor; + } + + @Override + public void replay( + long offset, + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { + RecordAndMetadata recordAndMetadata = new RecordAndMetadata( + offset, + producerId, + producerEpoch, + record + ); + + if (producerId == RecordBatch.NO_PRODUCER_ID) { + records.add(recordAndMetadata); + } else { + pendingRecords + .computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0)) + .add(recordAndMetadata); + } + } + + @Override + public void replayEndTransactionMarker( + long producerId, + short producerEpoch, + TransactionResult result + ) throws RuntimeException { + if (result == TransactionResult.COMMIT) { + TimelineHashSet pending = pendingRecords.remove(producerId); + if (pending == null) return; + records.addAll(pending); + } else { + pendingRecords.remove(producerId); + } + } + + Set pendingRecords(long producerId) { + TimelineHashSet pending = pendingRecords.get(producerId); + if (pending == null) return Set.of(); + return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); + } + + Set records() { + return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); + } + + List fullRecords() { + return records + .stream() + .sorted(Comparator.comparingLong(record -> record.offset)) + .toList(); + } + + CoordinatorTimer timer() { + return timer; + } + + CoordinatorExecutor executor() { + return executor; + } +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java new file mode 100644 index 00000000000..dea2dcc1c17 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.timeline.SnapshotRegistry; + +import java.util.Objects; + +/** + * A CoordinatorBuilder that creates a MockCoordinator. + */ +public class MockCoordinatorShardBuilder implements CoordinatorShardBuilder { + private SnapshotRegistry snapshotRegistry; + private CoordinatorTimer timer; + private CoordinatorExecutor executor; + + @Override + public CoordinatorShardBuilder withSnapshotRegistry( + SnapshotRegistry snapshotRegistry + ) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + @Override + public CoordinatorShardBuilder withLogContext( + LogContext logContext + ) { + return this; + } + + @Override + public CoordinatorShardBuilder withTime( + Time time + ) { + return this; + } + + @Override + public CoordinatorShardBuilder withExecutor( + CoordinatorExecutor executor + ) { + this.executor = executor; + return this; + } + + @Override + public CoordinatorShardBuilder withTimer( + CoordinatorTimer timer + ) { + this.timer = timer; + return this; + } + + @Override + public CoordinatorShardBuilder withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) { + return this; + } + + @Override + public CoordinatorShardBuilder withTopicPartition( + TopicPartition topicPartition + ) { + return this; + } + + @Override + public MockCoordinatorShard build() { + return new MockCoordinatorShard( + Objects.requireNonNull(this.snapshotRegistry), + Objects.requireNonNull(this.timer), + Objects.requireNonNull(this.executor) + ); + } +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java new file mode 100644 index 00000000000..09efdba5e81 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilderSupplier.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +/** + * A CoordinatorBuilderSupplier that returns a MockCoordinatorBuilder. + */ +public class MockCoordinatorShardBuilderSupplier implements CoordinatorShardBuilderSupplier { + @Override + public CoordinatorShardBuilder get() { + return new MockCoordinatorShardBuilder(); + } +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java new file mode 100644 index 00000000000..19564093bb4 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockPartitionWriter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.VerificationGuard; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An in-memory partition writer that accepts a maximum number of writes. + */ +public class MockPartitionWriter extends InMemoryPartitionWriter { + private final Time time; + private final int maxWrites; + private final boolean failEndMarker; + private final AtomicInteger writeCount = new AtomicInteger(0); + + public MockPartitionWriter() { + this(new MockTime(), Integer.MAX_VALUE, false); + } + + public MockPartitionWriter(int maxWrites) { + this(new MockTime(), maxWrites, false); + } + + public MockPartitionWriter(boolean failEndMarker) { + this(new MockTime(), Integer.MAX_VALUE, failEndMarker); + } + + public MockPartitionWriter(Time time, int maxWrites, boolean failEndMarker) { + super(false); + this.time = time; + this.maxWrites = maxWrites; + this.failEndMarker = failEndMarker; + } + + @Override + public void registerListener(TopicPartition tp, Listener listener) { + super.registerListener(tp, listener); + } + + @Override + public void deregisterListener(TopicPartition tp, Listener listener) { + super.deregisterListener(tp, listener); + } + + @Override + public long append( + TopicPartition tp, + VerificationGuard verificationGuard, + MemoryRecords batch + ) { + if (batch.sizeInBytes() > config(tp).maxMessageSize()) + throw new RecordTooLargeException("Batch is larger than the max message size"); + + // We don't want the coordinator to write empty batches. + if (batch.validBytes() <= 0) + throw new KafkaException("Coordinator tried to write an empty batch"); + + if (writeCount.incrementAndGet() > maxWrites) + throw new KafkaException("Maximum number of writes reached"); + + if (failEndMarker && batch.firstBatch().isControlBatch()) + throw new KafkaException("Couldn't write end marker."); + + time.sleep(10); + return super.append(tp, verificationGuard, batch); + } +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java index 40c23a2759a..b1436a3eff5 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinatorTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeTest.MockCoordinatorShard; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java new file mode 100644 index 00000000000..16b72f15882 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/StringSerializer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.nio.charset.Charset; + +/** + * A Serializer for strings. + */ +public class StringSerializer implements Serializer { + @Override + public byte[] serializeKey(String record) { + return null; + } + + @Override + public byte[] serializeValue(String record) { + return record.getBytes(Charset.defaultCharset()); + } +} diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java new file mode 100644 index 00000000000..26124c65a13 --- /dev/null +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/ThrowingSerializer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.nio.BufferOverflowException; + +/** + * A Serializer that throws exceptions on demand. + */ +public class ThrowingSerializer implements Serializer { + private final Serializer serializer; + private boolean throwOnNextOperation; + + public ThrowingSerializer(Serializer serializer) { + this.serializer = serializer; + this.throwOnNextOperation = false; + } + + public void throwOnNextOperation() { + throwOnNextOperation = true; + } + + @Override + public byte[] serializeKey(T record) { + return serializer.serializeKey(record); + } + + @Override + public byte[] serializeValue(T record) { + if (throwOnNextOperation) { + throwOnNextOperation = false; + throw new BufferOverflowException(); + } + return serializer.serializeValue(record); + } +}