mirror of https://github.com/apache/kafka.git
MINOR: Fix format in CoordinatorLoaderImplTest (#20548)
Fix indentation in `CoordinatorLoaderImplTest` to be consistent with the rest of the code in the package. Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
parent
8c8e93c4a1
commit
dbd2b527d0
|
@ -72,8 +72,8 @@ class CoordinatorLoaderImplTest {
|
|||
@Override
|
||||
public Map.Entry<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
|
||||
return Map.entry(
|
||||
StandardCharsets.UTF_8.decode(key).toString(),
|
||||
StandardCharsets.UTF_8.decode(value).toString()
|
||||
StandardCharsets.UTF_8.decode(key).toString(),
|
||||
StandardCharsets.UTF_8.decode(value).toString()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -87,11 +87,11 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
|
||||
}
|
||||
|
@ -106,11 +106,11 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
loader.close();
|
||||
assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
|
||||
|
@ -127,59 +127,59 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult1);
|
||||
.thenReturn(readResult1);
|
||||
|
||||
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult2);
|
||||
.thenReturn(readResult2);
|
||||
|
||||
FetchDataInfo readResult3 = logReadResult(5, 100L, (short) 5, Arrays.asList(
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult3);
|
||||
.thenReturn(readResult3);
|
||||
|
||||
FetchDataInfo readResult4 = logReadResult(
|
||||
7,
|
||||
100L,
|
||||
(short) 5,
|
||||
ControlRecordType.COMMIT
|
||||
7,
|
||||
100L,
|
||||
(short) 5,
|
||||
ControlRecordType.COMMIT
|
||||
);
|
||||
|
||||
when(log.read(7L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult4);
|
||||
.thenReturn(readResult4);
|
||||
|
||||
FetchDataInfo readResult5 = logReadResult(
|
||||
8,
|
||||
500L,
|
||||
(short) 10,
|
||||
ControlRecordType.ABORT
|
||||
8,
|
||||
500L,
|
||||
(short) 10,
|
||||
ControlRecordType.ABORT
|
||||
);
|
||||
|
||||
when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult5);
|
||||
.thenReturn(readResult5);
|
||||
|
||||
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
|
||||
assertNotNull(summary);
|
||||
|
@ -213,25 +213,25 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
when(log.read(
|
||||
anyLong(),
|
||||
eq(1000),
|
||||
eq(FetchIsolation.LOG_END),
|
||||
eq(true)
|
||||
anyLong(),
|
||||
eq(1000),
|
||||
eq(FetchIsolation.LOG_END),
|
||||
eq(true)
|
||||
)).thenAnswer((InvocationOnMock invocation) -> {
|
||||
latch.countDown();
|
||||
return readResult;
|
||||
|
@ -258,25 +258,25 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult);
|
||||
.thenReturn(readResult);
|
||||
|
||||
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
|
||||
.thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
|
||||
.thenReturn(Map.entry("k2", "v2"));
|
||||
.thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
|
||||
.thenReturn(Map.entry("k2", "v2"));
|
||||
|
||||
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -294,24 +294,24 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult);
|
||||
.thenReturn(readResult);
|
||||
|
||||
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
|
||||
.thenThrow(new RuntimeException("Error!"));
|
||||
.thenThrow(new RuntimeException("Error!"));
|
||||
|
||||
RuntimeException ex = assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
|
||||
|
||||
|
@ -333,18 +333,18 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult = logReadResult(0, List.of());
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult);
|
||||
.thenReturn(readResult);
|
||||
|
||||
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
@ -361,34 +361,34 @@ class CoordinatorLoaderImplTest {
|
|||
MockTime time = new MockTime();
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
time,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
time,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
long startTimeMs = time.milliseconds();
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenAnswer((InvocationOnMock invocation) -> {
|
||||
time.sleep(1000);
|
||||
return readResult1;
|
||||
});
|
||||
.thenAnswer((InvocationOnMock invocation) -> {
|
||||
time.sleep(1000);
|
||||
return readResult1;
|
||||
});
|
||||
|
||||
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult2);
|
||||
.thenReturn(readResult2);
|
||||
|
||||
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
|
||||
assertEquals(startTimeMs, summary.startTimeMs());
|
||||
|
@ -408,39 +408,39 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult1);
|
||||
|
||||
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult2);
|
||||
.thenReturn(readResult2);
|
||||
|
||||
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult3);
|
||||
.thenReturn(readResult3);
|
||||
|
||||
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
|
||||
|
||||
|
@ -471,11 +471,11 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
|
@ -497,39 +497,39 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult1);
|
||||
.thenReturn(readResult1);
|
||||
|
||||
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult2);
|
||||
.thenReturn(readResult2);
|
||||
|
||||
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
|
||||
new SimpleRecord("k7".getBytes(), "v7".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult3);
|
||||
.thenReturn(readResult3);
|
||||
|
||||
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
|
||||
|
||||
|
@ -561,32 +561,32 @@ class CoordinatorLoaderImplTest {
|
|||
CoordinatorPlayback<Map.Entry<String, String>> coordinator = mock(CoordinatorPlayback.class);
|
||||
|
||||
try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new CoordinatorLoaderImpl<>(
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
Time.SYSTEM,
|
||||
partitionLogSupplier,
|
||||
partitionLogEndOffsetSupplier,
|
||||
serde,
|
||||
1000
|
||||
)) {
|
||||
when(log.logStartOffset()).thenReturn(0L);
|
||||
when(log.highWatermark()).thenReturn(0L);
|
||||
when(partitionLogEndOffsetSupplier.apply(tp)).thenReturn(Optional.of(5L)).thenReturn(Optional.of(-1L));
|
||||
|
||||
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
|
||||
new SimpleRecord("k2".getBytes(), "v2".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult1);
|
||||
.thenReturn(readResult1);
|
||||
|
||||
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
|
||||
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
|
||||
new SimpleRecord("k5".getBytes(), "v5".getBytes())
|
||||
));
|
||||
|
||||
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
|
||||
.thenReturn(readResult2);
|
||||
.thenReturn(readResult2);
|
||||
|
||||
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
|
||||
}
|
||||
|
@ -597,28 +597,28 @@ class CoordinatorLoaderImplTest {
|
|||
}
|
||||
|
||||
private FetchDataInfo logReadResult(
|
||||
long startOffset,
|
||||
long producerId,
|
||||
short producerEpoch,
|
||||
List<SimpleRecord> records
|
||||
long startOffset,
|
||||
long producerId,
|
||||
short producerEpoch,
|
||||
List<SimpleRecord> records
|
||||
) throws IOException {
|
||||
FileRecords fileRecords = mock(FileRecords.class);
|
||||
MemoryRecords memoryRecords;
|
||||
if (producerId == RecordBatch.NO_PRODUCER_ID) {
|
||||
memoryRecords = MemoryRecords.withRecords(
|
||||
startOffset,
|
||||
Compression.NONE,
|
||||
records.toArray(new SimpleRecord[0])
|
||||
startOffset,
|
||||
Compression.NONE,
|
||||
records.toArray(new SimpleRecord[0])
|
||||
);
|
||||
} else {
|
||||
memoryRecords = MemoryRecords.withTransactionalRecords(
|
||||
startOffset,
|
||||
Compression.NONE,
|
||||
producerId,
|
||||
producerEpoch,
|
||||
0,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
records.toArray(new SimpleRecord[0])
|
||||
startOffset,
|
||||
Compression.NONE,
|
||||
producerId,
|
||||
producerEpoch,
|
||||
0,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
records.toArray(new SimpleRecord[0])
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -635,19 +635,19 @@ class CoordinatorLoaderImplTest {
|
|||
}
|
||||
|
||||
private FetchDataInfo logReadResult(
|
||||
long startOffset,
|
||||
long producerId,
|
||||
short producerEpoch,
|
||||
ControlRecordType controlRecordType
|
||||
long startOffset,
|
||||
long producerId,
|
||||
short producerEpoch,
|
||||
ControlRecordType controlRecordType
|
||||
) throws IOException {
|
||||
FileRecords fileRecords = mock(FileRecords.class);
|
||||
MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(
|
||||
startOffset,
|
||||
0L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
producerId,
|
||||
producerEpoch,
|
||||
new EndTransactionMarker(controlRecordType, 0)
|
||||
startOffset,
|
||||
0L,
|
||||
RecordBatch.NO_PARTITION_LEADER_EPOCH,
|
||||
producerId,
|
||||
producerEpoch,
|
||||
new EndTransactionMarker(controlRecordType, 0)
|
||||
);
|
||||
|
||||
when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());
|
||||
|
|
Loading…
Reference in New Issue