diff --git a/build.gradle b/build.gradle
index 5bb893f4fa4..f68b130f2de 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1664,7 +1664,6 @@ project(':coordinator-common') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
- testImplementation project(':core')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
diff --git a/checkstyle/import-control-coordinator-common.xml b/checkstyle/import-control-coordinator-common.xml
index 09589d87901..7841697cf89 100644
--- a/checkstyle/import-control-coordinator-common.xml
+++ b/checkstyle/import-control-coordinator-common.xml
@@ -27,6 +27,8 @@
+
+
@@ -41,7 +43,6 @@
-
@@ -67,7 +68,6 @@
-
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 8d741fa16e0..61c5d52c8a9 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.coordinator.common.runtime;
-import kafka.server.ReplicaManager;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
@@ -51,8 +49,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import scala.Option;
-
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -65,10 +61,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static scala.jdk.javaapi.OptionConverters.toJava;
-import static scala.jdk.javaapi.OptionConverters.toScala;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "resource"})
@Timeout(60)
class CoordinatorLoaderImplTest {
@@ -89,9 +83,8 @@ class CoordinatorLoaderImplTest {
@Test
void testNonexistentPartition() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Function> partitionLogSupplier = partition -> Optional.empty();
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.empty();
Deserializer> serde = mock(Deserializer.class);
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
@@ -102,8 +95,6 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(Option.empty());
-
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
}
@@ -111,9 +102,8 @@ class CoordinatorLoaderImplTest {
@Test
void testLoadingIsRejectedWhenClosed() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
+ Function> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class));
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.empty();
Deserializer> serde = mock(Deserializer.class);
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
@@ -132,11 +122,10 @@ class CoordinatorLoaderImplTest {
@Test
void testLoading() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- Deserializer> serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(9L);
+ Deserializer> serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -146,9 +135,7 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L)));
when(log.highWatermark()).thenReturn(0L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
@@ -218,11 +205,10 @@ class CoordinatorLoaderImplTest {
@Test
void testLoadingStoppedWhenClosed() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- Deserializer> serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(100L);
+ Deserializer> serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -232,9 +218,7 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -266,11 +250,10 @@ class CoordinatorLoaderImplTest {
@Test
void testUnknownRecordTypeAreIgnored() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -280,9 +263,7 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -305,11 +286,10 @@ class CoordinatorLoaderImplTest {
@Test
void testDeserializationErrorFailsTheLoading() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -319,9 +299,7 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -347,11 +325,10 @@ class CoordinatorLoaderImplTest {
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(10L);
+ StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -361,9 +338,7 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L)));
FetchDataInfo readResult = logReadResult(0, List.of());
@@ -377,11 +352,10 @@ class CoordinatorLoaderImplTest {
@Test
void testLoadSummary() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(5L);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
MockTime time = new MockTime();
@@ -393,9 +367,7 @@ class CoordinatorLoaderImplTest {
1000
)) {
long startTimeMs = time.milliseconds();
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -428,11 +400,10 @@ class CoordinatorLoaderImplTest {
@Test
void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -442,10 +413,8 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -494,11 +463,10 @@ class CoordinatorLoaderImplTest {
@Test
void testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(0L);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -508,10 +476,8 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L)));
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
@@ -523,11 +489,10 @@ class CoordinatorLoaderImplTest {
@Test
void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -537,10 +502,8 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
@@ -590,11 +553,10 @@ class CoordinatorLoaderImplTest {
@Test
void testPartitionGoesOfflineDuringLoad() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
- ReplicaManager replicaManager = mock(ReplicaManager.class);
- Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
- Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
- StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
+ Function> partitionLogSupplier = partition -> Optional.of(log);
+ Function> partitionLogEndOffsetSupplier = mock(Function.class);
+ StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>(
@@ -604,10 +566,9 @@ class CoordinatorLoaderImplTest {
serde,
1000
)) {
- when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
- when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L)));
+ when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),