mirror of https://github.com/apache/kafka.git
MINOR: Remove unnecessary dependencies from coordinator-common (follow up to pr#20089) (#20194)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
This PR removes the dependencies on `core` and `scala-library` from the `coordinator-common` module, as a follow-up to https://github.com/apache/kafka/pull/20089. These dependencies have been removed from tests, and the previously added import-control relaxations have been reverted accordingly. Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
parent
93a88a940b
commit
1b351ad6e2
|
@ -1664,7 +1664,6 @@ project(':coordinator-common') {
|
|||
|
||||
testImplementation project(':clients').sourceSets.test.output
|
||||
testImplementation project(':server-common').sourceSets.test.output
|
||||
testImplementation project(':core')
|
||||
testImplementation libs.junitJupiter
|
||||
testImplementation libs.mockitoCore
|
||||
testImplementation testLog4j2Libs
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
<allow pkg="org.slf4j" />
|
||||
<allow pkg="org.junit" />
|
||||
<allow pkg="org.mockito" />
|
||||
<!-- no one depends on the server -->
|
||||
<disallow pkg="kafka" />
|
||||
|
||||
<!-- anyone can use public classes -->
|
||||
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
|
||||
|
@ -41,7 +43,6 @@
|
|||
<subpackage name="common">
|
||||
<subpackage name="runtime">
|
||||
<allow pkg="com.yammer.metrics.core" />
|
||||
<allow pkg="kafka.server" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
<allow pkg="org.apache.kafka.common.annotation" />
|
||||
<allow pkg="org.apache.kafka.common.compress" />
|
||||
|
@ -67,7 +68,6 @@
|
|||
<allow pkg="org.apache.kafka.test" />
|
||||
<allow pkg="org.apache.kafka.timeline" />
|
||||
<allow pkg="org.HdrHistogram" />
|
||||
<allow pkg="scala" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.kafka.coordinator.common.runtime;
|
||||
|
||||
import kafka.server.ReplicaManager;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
|
||||
|
@ -51,8 +49,6 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
@ -65,10 +61,8 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static scala.jdk.javaapi.OptionConverters.toJava;
|
||||
import static scala.jdk.javaapi.OptionConverters.toScala;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressWarnings({"unchecked", "resource"})
|
||||
@Timeout(60)
|
||||
class CoordinatorLoaderImplTest {
|
||||
|
||||
|
@ -89,9 +83,8 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testNonexistentPartition() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.empty();
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty();
|
||||
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
|
@ -102,8 +95,6 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(Option.empty());
|
||||
|
||||
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
|
||||
}
|
||||
}
|
||||
|
@ -111,9 +102,8 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testLoadingIsRejectedWhenClosed() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(mock(UnifiedLog.class));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.empty();
|
||||
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
|
@ -132,11 +122,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testLoading() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(9L);
|
||||
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -146,9 +135,7 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L)));
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
|
@ -218,11 +205,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testLoadingStoppedWhenClosed() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(100L);
|
||||
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -232,9 +218,7 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L)));
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -266,11 +250,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testUnknownRecordTypeAreIgnored() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -280,9 +263,7 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -305,11 +286,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testDeserializationErrorFailsTheLoading() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(2L);
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -319,9 +299,7 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -347,11 +325,10 @@ class CoordinatorLoaderImplTest {
|
|||
// when all the records are expired and the active segment is truncated or when the partition
|
||||
// is accidentally corrupted.
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(10L);
|
||||
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -361,9 +338,7 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L)));
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, List.of());
|
||||
|
||||
|
@ -377,11 +352,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testLoadSummary() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(5L);
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
MockTime time = new MockTime();
|
||||
|
||||
|
@ -393,9 +367,7 @@ class CoordinatorLoaderImplTest {
|
|||
1000
|
||||
)) {
|
||||
long startTimeMs = time.milliseconds();
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L)));
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -428,11 +400,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -442,10 +413,8 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -494,11 +463,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(0L);
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -508,10 +476,8 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L)));
|
||||
|
||||
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
|
||||
|
||||
|
@ -523,11 +489,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.of(7L);
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -537,10 +502,8 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
@ -590,11 +553,10 @@ class CoordinatorLoaderImplTest {
|
|||
@Test
|
||||
void testPartitionGoesOfflineDuringLoad() throws Exception {
|
||||
TopicPartition tp = new TopicPartition("foo", 0);
|
||||
ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
UnifiedLog log = mock(UnifiedLog.class);
|
||||
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> Optional.of(log);
|
||||
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = mock(Function.class);
|
||||
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
|
||||
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
|
@ -604,10 +566,9 @@ class CoordinatorLoaderImplTest {
|
|||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L)));
|
||||
when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
|
|
Loading…
Reference in New Issue