From 1b351ad6e25094d9b38cc8cbe53a7a8caeec99e9 Mon Sep 17 00:00:00 2001 From: Logan Zhu Date: Sat, 19 Jul 2025 19:08:33 +0800 Subject: [PATCH] MINOR: Remove unnecessary dependencies from coordinator-common (follow up to pr#20089) (#20194) This PR removes the dependencies on `core` and `scala-library` from the `coordinator-common` module, as a follow-up to https://github.com/apache/kafka/pull/20089. These dependencies have been removed from tests, and the previously added import-control relaxations have been reverted accordingly. Reviewers: TengYao Chi , Ken Huang --- build.gradle | 1 - .../import-control-coordinator-common.xml | 4 +- .../runtime/CoordinatorLoaderImplTest.java | 111 ++++++------------ 3 files changed, 38 insertions(+), 78 deletions(-) diff --git a/build.gradle b/build.gradle index 5bb893f4fa4..f68b130f2de 100644 --- a/build.gradle +++ b/build.gradle @@ -1664,7 +1664,6 @@ project(':coordinator-common') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output - testImplementation project(':core') testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation testLog4j2Libs diff --git a/checkstyle/import-control-coordinator-common.xml b/checkstyle/import-control-coordinator-common.xml index 09589d87901..7841697cf89 100644 --- a/checkstyle/import-control-coordinator-common.xml +++ b/checkstyle/import-control-coordinator-common.xml @@ -27,6 +27,8 @@ + + @@ -41,7 +43,6 @@ - @@ -67,7 +68,6 @@ - diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java index 8d741fa16e0..61c5d52c8a9 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.coordinator.common.runtime; -import kafka.server.ReplicaManager; - import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; @@ -51,8 +49,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import scala.Option; - import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -65,10 +61,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static scala.jdk.javaapi.OptionConverters.toJava; -import static scala.jdk.javaapi.OptionConverters.toScala; -@SuppressWarnings("unchecked") +@SuppressWarnings({"unchecked", "resource"}) @Timeout(60) class CoordinatorLoaderImplTest { @@ -89,9 +83,8 @@ class CoordinatorLoaderImplTest { @Test void testNonexistentPartition() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); + Function> partitionLogSupplier = partition -> Optional.empty(); + Function> partitionLogEndOffsetSupplier = partition -> Optional.empty(); Deserializer> serde = mock(Deserializer.class); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); @@ -102,8 +95,6 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(Option.empty()); - assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator)); } } @@ -111,9 +102,8 @@ class CoordinatorLoaderImplTest { @Test void testLoadingIsRejectedWhenClosed() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); + Function> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class)); + Function> partitionLogEndOffsetSupplier = partition -> Optional.empty(); Deserializer> serde = mock(Deserializer.class); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); @@ -132,11 +122,10 @@ class CoordinatorLoaderImplTest { @Test void testLoading() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - Deserializer> serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(9L); + Deserializer> serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -146,9 +135,7 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L))); when(log.highWatermark()).thenReturn(0L); FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( @@ -218,11 +205,10 @@ class CoordinatorLoaderImplTest { @Test void testLoadingStoppedWhenClosed() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - Deserializer> serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(100L); + Deserializer> serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -232,9 +218,7 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L))); FetchDataInfo readResult = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -266,11 +250,10 @@ class CoordinatorLoaderImplTest { @Test void testUnknownRecordTypeAreIgnored() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); + StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -280,9 +263,7 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L))); FetchDataInfo readResult = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -305,11 +286,10 @@ class CoordinatorLoaderImplTest { @Test void testDeserializationErrorFailsTheLoading() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(2L); + StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -319,9 +299,7 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L))); FetchDataInfo readResult = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -347,11 +325,10 @@ class CoordinatorLoaderImplTest { // when all the records are expired and the active segment is truncated or when the partition // is accidentally corrupted. TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(10L); + StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -361,9 +338,7 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L))); FetchDataInfo readResult = logReadResult(0, List.of()); @@ -377,11 +352,10 @@ class CoordinatorLoaderImplTest { @Test void testLoadSummary() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(5L); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); MockTime time = new MockTime(); @@ -393,9 +367,7 @@ class CoordinatorLoaderImplTest { 1000 )) { long startTimeMs = time.milliseconds(); - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))); FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -428,11 +400,10 @@ class CoordinatorLoaderImplTest { @Test void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -442,10 +413,8 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L, 0L, 2L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L))); FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -494,11 +463,10 @@ class CoordinatorLoaderImplTest { @Test void testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(0L); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -508,10 +476,8 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L))); assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); @@ -523,11 +489,10 @@ class CoordinatorLoaderImplTest { @Test void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = partition -> Optional.of(7L); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -537,10 +502,8 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(5L, 7L, 7L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L))); FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()), @@ -590,11 +553,10 @@ class CoordinatorLoaderImplTest { @Test void testPartitionGoesOfflineDuringLoad() throws Exception { TopicPartition tp = new TopicPartition("foo", 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); - Function> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition)); - Function> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get()); - StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); UnifiedLog log = mock(UnifiedLog.class); + Function> partitionLogSupplier = partition -> Optional.of(log); + Function> partitionLogEndOffsetSupplier = mock(Function.class); + StringKeyValueDeserializer serde = new StringKeyValueDeserializer(); CoordinatorPlayback> coordinator = mock(CoordinatorPlayback.class); try (CoordinatorLoaderImpl> loader = new CoordinatorLoaderImpl<>( @@ -604,10 +566,9 @@ class CoordinatorLoaderImplTest { serde, 1000 )) { - when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log))); when(log.logStartOffset()).thenReturn(0L); when(log.highWatermark()).thenReturn(0L); - when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L))); + when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L)); FetchDataInfo readResult1 = logReadResult(0, Arrays.asList( new SimpleRecord("k1".getBytes(), "v1".getBytes()),