MINOR: Various cleanups in CoordinatorRuntimeTest (#17829)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
David Jacot 2024-11-18 10:30:54 +01:00 committed by GitHub
parent 078d34f39d
commit bc68011b62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 145 additions and 155 deletions

View File

@ -143,7 +143,7 @@ public class CoordinatorRuntimeTest {
}
@Override
public void close() throws Exception {}
public void close() {}
}
/**
@ -181,7 +181,7 @@ public class CoordinatorRuntimeTest {
}
@Override
public void close() throws Exception {
public void close() {
}
}
@ -219,7 +219,7 @@ public class CoordinatorRuntimeTest {
}
@Override
public void close() throws Exception { }
public void close() { }
}
/**
@ -403,28 +403,18 @@ public class CoordinatorRuntimeTest {
Set<String> pendingRecords(long producerId) {
TimelineHashSet<RecordAndMetadata> pending = pendingRecords.get(producerId);
if (pending == null) return Collections.emptySet();
return Collections.unmodifiableSet(
pending.stream().map(record -> record.record).collect(Collectors.toSet())
);
return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
Set<String> records() {
return Collections.unmodifiableSet(
records.stream().map(record -> record.record).collect(Collectors.toSet())
);
return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet());
}
List<RecordAndMetadata> fullRecords() {
return Collections.unmodifiableList(
records
.stream()
.sorted(Comparator.comparingLong(record -> record.offset))
.collect(Collectors.toList())
);
}
CoordinatorTimer<Void, String> timer() {
return timer;
return records
.stream()
.sorted(Comparator.comparingLong(record -> record.offset))
.collect(Collectors.toList());
}
}
@ -1081,11 +1071,11 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")
);
// Verify that the write is not committed yet.
@ -1096,17 +1086,17 @@ public class CoordinatorRuntimeTest {
// The last committed offset does not change.
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator.
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record3"), "response2"));
state -> new CoordinatorResult<>(List.of("record3"), "response2"));
// Verify that the write is not committed yet.
assertFalse(write2.isDone());
@ -1116,11 +1106,11 @@ public class CoordinatorRuntimeTest {
// The last committed offset does not change.
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator.
assertEquals(Set.of("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record1", "record2"),
records(timer.time().milliseconds(), "record3")
), writer.entries(TP));
@ -1135,9 +1125,9 @@ public class CoordinatorRuntimeTest {
// The state does not change.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record1", "record2"),
records(timer.time().milliseconds(), "record3")
), writer.entries(TP));
@ -1152,7 +1142,7 @@ public class CoordinatorRuntimeTest {
// The last committed offset is updated.
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
// The snapshot is cleaned up.
assertEquals(Arrays.asList(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
// Commit write #2.
writer.commit(TP, 3);
@ -1166,7 +1156,7 @@ public class CoordinatorRuntimeTest {
// The last committed offset is updated.
assertEquals(3L, ctx.coordinator.lastCommittedOffset());
// The snapshot is cleaned up.
assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(3L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #4 but without records.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT,
@ -1175,7 +1165,7 @@ public class CoordinatorRuntimeTest {
// It is completed immediately because the state is fully committed.
assertTrue(write4.isDone());
assertEquals("response4", write4.get(5, TimeUnit.SECONDS));
assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(3L), ctx.coordinator.snapshotRegistry().epochsList());
}
@Test
@ -1254,7 +1244,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Override the coordinator with a coordinator that throws
// an exception when replay is called.
@ -1278,13 +1268,13 @@ public class CoordinatorRuntimeTest {
// Write. It should fail.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
assertFutureThrows(write, IllegalArgumentException.class);
// Verify that the state has not changed.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
}
@Test
@ -1314,28 +1304,28 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1. It should succeed and be applied to the coordinator.
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
// Verify that the state has been updated.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records());
// Write #2. It should fail because the writer is configured to only
// accept 1 write.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record3", "record4", "record5"), "response2"));
state -> new CoordinatorResult<>(List.of("record3", "record4", "record5"), "response2"));
assertFutureThrows(write2, KafkaException.class);
// Verify that the state has not changed.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records());
}
@ -1366,11 +1356,11 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1. We should get a TimeoutException because the HWM will not advance.
CompletableFuture<String> timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
timer.advanceClock(4);
@ -1410,8 +1400,8 @@ public class CoordinatorRuntimeTest {
List<CompletableFuture<List<String>>> writes = runtime.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, state -> {
int counter = cnt.getAndIncrement();
return new CoordinatorResult<>(
Collections.singletonList("record#" + counter),
Collections.singletonList("response#" + counter)
List.of("record#" + counter),
List.of("response#" + counter)
);
});
@ -1419,9 +1409,9 @@ public class CoordinatorRuntimeTest {
assertEquals(1L, runtime.contextOrThrow(coordinator1).coordinator.lastWrittenOffset());
assertEquals(1L, runtime.contextOrThrow(coordinator2).coordinator.lastWrittenOffset());
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#0")), writer.entries(coordinator0));
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#1")), writer.entries(coordinator1));
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#2")), writer.entries(coordinator2));
assertEquals(List.of(records(timer.time().milliseconds(), "record#0")), writer.entries(coordinator0));
assertEquals(List.of(records(timer.time().milliseconds(), "record#1")), writer.entries(coordinator1));
assertEquals(List.of(records(timer.time().milliseconds(), "record#2")), writer.entries(coordinator2));
// Commit.
writer.commit(coordinator0);
@ -1430,7 +1420,7 @@ public class CoordinatorRuntimeTest {
// Verify.
assertEquals(
Arrays.asList("response#0", "response#1", "response#2"),
List.of("response#0", "response#1", "response#2"),
FutureUtils.combineFutures(writes, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS)
);
}
@ -1494,7 +1484,7 @@ public class CoordinatorRuntimeTest {
100L,
(short) 50,
Duration.ofMillis(5000),
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"),
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
@ -1583,7 +1573,7 @@ public class CoordinatorRuntimeTest {
100L,
(short) 50,
Duration.ofMillis(5000),
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"),
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
@ -1625,7 +1615,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Transactional write #1.
CompletableFuture<String> write1 = runtime.scheduleTransactionalWriteOperation(
@ -1635,7 +1625,7 @@ public class CoordinatorRuntimeTest {
100L,
(short) 5,
DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
@ -1647,14 +1637,14 @@ public class CoordinatorRuntimeTest {
// The last committed offset does not change.
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator. They are stored in
// the pending set for now.
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(
100L
));
// Records have been written to the log.
assertEquals(Collections.singletonList(
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -1677,7 +1667,7 @@ public class CoordinatorRuntimeTest {
// The last committed offset does not change.
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator.
ControlRecordType expectedType;
if (result == TransactionResult.COMMIT) {
@ -1691,7 +1681,7 @@ public class CoordinatorRuntimeTest {
}
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2"),
endTransactionMarker(100L, (short) 5, timer.time().milliseconds(), 10, expectedType)
), writer.entries(TP));
@ -1737,7 +1727,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Complete #1. We should get a TimeoutException because the HWM will not advance.
CompletableFuture<Void> timedOutCompletion = runtime.scheduleTransactionCompletion(
@ -1753,7 +1743,7 @@ public class CoordinatorRuntimeTest {
// Verify that the state has been updated.
assertEquals(1L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
// Advance clock to timeout Complete #1.
timer.advanceClock(4);
@ -1764,7 +1754,7 @@ public class CoordinatorRuntimeTest {
// operation timeouts because the record has been written to the log.
assertEquals(1L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList());
}
@Test
@ -1794,7 +1784,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1. It should succeed and be applied to the coordinator.
runtime.scheduleTransactionalWriteOperation(
@ -1804,14 +1794,14 @@ public class CoordinatorRuntimeTest {
100L,
(short) 5,
DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
// Verify that the state has been updated.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
@ -1830,7 +1820,7 @@ public class CoordinatorRuntimeTest {
// Verify that the state has not changed.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
}
@ -1860,7 +1850,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Override the coordinator with a coordinator that throws
// an exception when replayEndTransactionMarker is called.
@ -1889,17 +1879,17 @@ public class CoordinatorRuntimeTest {
100L,
(short) 5,
DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
// Verify that the state has been updated.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Collections.singletonList(
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
@ -1918,10 +1908,10 @@ public class CoordinatorRuntimeTest {
// Verify that the state has not changed.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(Collections.singletonList(
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
}
@ -1955,11 +1945,11 @@ public class CoordinatorRuntimeTest {
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2"));
state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2"));
// Commit write #1.
writer.commit(TP, 2);
@ -2038,11 +2028,11 @@ public class CoordinatorRuntimeTest {
// Write #1.
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
// Write #2.
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2"));
state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2"));
// Commit write #1.
writer.commit(TP, 2);
@ -2085,11 +2075,11 @@ public class CoordinatorRuntimeTest {
// Writes
runtime.scheduleWriteOperation("write#0", coordinator0, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record0"), "response0"));
state -> new CoordinatorResult<>(List.of("record0"), "response0"));
runtime.scheduleWriteOperation("write#1", coordinator1, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1"));
state -> new CoordinatorResult<>(List.of("record1"), "response1"));
runtime.scheduleWriteOperation("write#2", coordinator2, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2"));
state -> new CoordinatorResult<>(List.of("record2"), "response2"));
// Commit writes.
writer.commit(coordinator0);
@ -2103,7 +2093,7 @@ public class CoordinatorRuntimeTest {
);
assertEquals(
Arrays.asList("record0", "record1", "record2"),
List.of("record0", "record1", "record2"),
FutureUtils.combineFutures(responses, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS)
);
}
@ -2136,11 +2126,11 @@ public class CoordinatorRuntimeTest {
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"));
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2"));
state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2"));
// Writes are inflight.
assertFalse(write1.isDone());
@ -2151,7 +2141,7 @@ public class CoordinatorRuntimeTest {
// Timer #1. This is never executed.
ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), null));
() -> new CoordinatorResult<>(List.of("record5", "record6"), null));
// The coordinator timer should have one pending task.
assertEquals(1, ctx.timer.size());
@ -2271,11 +2261,11 @@ public class CoordinatorRuntimeTest {
// Timer #1.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), null));
() -> new CoordinatorResult<>(List.of("record1", "record2"), null));
// Timer #2.
ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), null));
() -> new CoordinatorResult<>(List.of("record3", "record4"), null));
// The coordinator timer should have two pending tasks.
assertEquals(2, ctx.timer.size());
@ -2329,7 +2319,7 @@ public class CoordinatorRuntimeTest {
// Timer #1.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record1"), null));
() -> new CoordinatorResult<>(List.of("record1"), null));
// The coordinator timer should have one pending task.
assertEquals(1, ctx.timer.size());
@ -2342,14 +2332,14 @@ public class CoordinatorRuntimeTest {
// Schedule a second timer with the same key.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record2"), null));
() -> new CoordinatorResult<>(List.of("record2"), null));
// The coordinator timer should still have one pending task.
assertEquals(1, ctx.timer.size());
// Schedule a third timer with the same key.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record3"), null));
() -> new CoordinatorResult<>(List.of("record3"), null));
// The coordinator timer should still have one pending task.
assertEquals(1, ctx.timer.size());
@ -2404,7 +2394,7 @@ public class CoordinatorRuntimeTest {
// Timer #1.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record1"), null));
() -> new CoordinatorResult<>(List.of("record1"), null));
// The coordinator timer should have one pending task.
assertEquals(1, ctx.timer.size());
@ -2417,7 +2407,7 @@ public class CoordinatorRuntimeTest {
// Schedule a second timer with the same key.
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record2"), null));
() -> new CoordinatorResult<>(List.of("record2"), null));
// The coordinator timer should still have one pending task.
assertEquals(1, ctx.timer.size());
@ -2819,8 +2809,8 @@ public class CoordinatorRuntimeTest {
1500,
30,
3000),
Arrays.asList(5L, 15L, 27L),
Arrays.asList(5L, 15L)))
List.of(5L, 15L, 27L),
List.of(5L, 15L)))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@ -2937,18 +2927,18 @@ public class CoordinatorRuntimeTest {
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);
processor.poll();
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")
state -> new CoordinatorResult<>(List.of("record2"), "response2")
);
processor.poll();
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record1"),
records(timer.time().milliseconds(), "record2")
), writer.entries(TP));
@ -3007,18 +2997,18 @@ public class CoordinatorRuntimeTest {
// Write#1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);
processor.poll();
// Write#2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")
state -> new CoordinatorResult<>(List.of("record2"), "response2")
);
processor.poll();
// Records have been written to the log.
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record1"),
records(timer.time().milliseconds(), "record2")
), writer.entries(TP));
@ -3092,7 +3082,7 @@ public class CoordinatorRuntimeTest {
processor.poll();
// Records have been written to the log.
assertEquals(Collections.singletonList(
assertEquals(List.of(
endTransactionMarker(100, (short) 50, timer.time().milliseconds(), 1, ControlRecordType.COMMIT)
), writer.entries(TP));
@ -3149,7 +3139,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
int maxBatchSize = writer.config(TP).maxMessageSize();
assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
@ -3200,7 +3190,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -3229,8 +3219,8 @@ public class CoordinatorRuntimeTest {
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
@ -3247,8 +3237,8 @@ public class CoordinatorRuntimeTest {
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@ -3268,14 +3258,14 @@ public class CoordinatorRuntimeTest {
// got flushed with all the records but the new one from #3.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(timer.time().milliseconds(), records.subList(0, 3))
), writer.entries(TP));
@ -3285,14 +3275,14 @@ public class CoordinatorRuntimeTest {
// Verify the state. The pending batch is flushed.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds() - 11, records.subList(0, 3)),
records(timer.time().milliseconds() - 11, records.subList(3, 4))
), writer.entries(TP));
@ -3334,7 +3324,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -3385,7 +3375,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -3415,8 +3405,8 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@ -3438,7 +3428,7 @@ public class CoordinatorRuntimeTest {
// Verify the state. The state should be reverted to the initial state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@ -3470,7 +3460,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Override the coordinator with a coordinator that throws
@ -3518,8 +3508,8 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.singletonList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
@ -3535,7 +3525,7 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@ -3567,12 +3557,12 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Write #1 with one record.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
state -> new CoordinatorResult<>(List.of("record#1"), "response1")
);
// Verify that the write is not committed yet.
@ -3581,7 +3571,7 @@ public class CoordinatorRuntimeTest {
// Verify the state. Records are replayed but no batch written.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1"), ctx.coordinator.coordinator().records());
assertEquals(Collections.emptyList(), writer.entries(TP));
@ -3594,7 +3584,7 @@ public class CoordinatorRuntimeTest {
100L,
(short) 50,
Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
state -> new CoordinatorResult<>(List.of("record#2"), "response2"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
@ -3605,17 +3595,17 @@ public class CoordinatorRuntimeTest {
// written to the log.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record#2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2")
), writer.entries(TP));
// Write #3 with one record.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
state -> new CoordinatorResult<>(List.of("record#3"), "response3")
);
// Verify that the write is not committed yet.
@ -3624,10 +3614,10 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record#2"), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1", "record#3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2")
), writer.entries(TP));
@ -3649,10 +3639,10 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1", "record#2", "record#3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds(), "record#1"),
transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2"),
records(timer.time().milliseconds(), "record#3"),
@ -3710,7 +3700,7 @@ public class CoordinatorRuntimeTest {
assertEquals(ACTIVE, ctx.state);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Keep a reference to the current coordinator.
@ -3823,7 +3813,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(1L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(2L), ctx.coordinator.snapshotRegistry().epochsList());
// Schedule a write operation that does not generate any records.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
@ -3838,7 +3828,7 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(2L), ctx.coordinator.snapshotRegistry().epochsList());
// The write operation should be completed.
assertEquals("response1", write.get(5, TimeUnit.SECONDS));
@ -3871,7 +3861,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -3908,14 +3898,14 @@ public class CoordinatorRuntimeTest {
// the first three records. The 4th one is pending.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(timer.time().milliseconds(), records.subList(0, 3))
), writer.entries(TP));
@ -3932,14 +3922,14 @@ public class CoordinatorRuntimeTest {
assertNull(ctx.currentBatch);
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(3L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(timer.time().milliseconds() - 11, records.subList(0, 3)),
records(timer.time().milliseconds() - 11, records.subList(3, 4))
), writer.entries(TP));
@ -3979,7 +3969,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -4007,8 +3997,8 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@ -4020,7 +4010,7 @@ public class CoordinatorRuntimeTest {
// Note that the batch will fail only when the batch is written because the
// MemoryBatchBuilder always accept one record.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.singletonList(record), "write#2", null, true, false)
state -> new CoordinatorResult<>(List.of(record), "write#2", null, true, false)
);
// Advance past the linger time to flush the pending batch.
@ -4035,13 +4025,13 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(timer.time().milliseconds() - 11, records.subList(0, 3))
), writer.entries(TP));
}
@ -4074,7 +4064,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);
// Get the max batch size.
@ -4104,8 +4094,8 @@ public class CoordinatorRuntimeTest {
// Verify the state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Arrays.asList(
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@ -4127,7 +4117,7 @@ public class CoordinatorRuntimeTest {
// Verify the state. The state should be reverted to the initial state.
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@ -4203,13 +4193,13 @@ public class CoordinatorRuntimeTest {
// got flushed with all the records but the new one from #3.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Collections.singletonList(
assertEquals(List.of(
records(firstBatchTimestamp, records.subList(0, 3))
), writer.entries(TP));
verify(runtimeMetrics, times(1)).recordFlushTime(10);
@ -4220,13 +4210,13 @@ public class CoordinatorRuntimeTest {
// Verify the state. The pending batch is flushed.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(Arrays.asList(
assertEquals(List.of(
records(secondBatchTimestamp, records.subList(0, 3)),
records(secondBatchTimestamp, records.subList(3, 4))
), writer.entries(TP));
@ -4273,11 +4263,11 @@ public class CoordinatorRuntimeTest {
// write#1 will be committed and update the high watermark. Record time spent in purgatory.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);
// write#2 will time out sitting in the purgatory. Record time spent in purgatory.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, writeTimeout,
state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")
state -> new CoordinatorResult<>(List.of("record2"), "response2")
);
// write#3 will error while appending. Does not spend time in purgatory.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, writeTimeout,
@ -4295,7 +4285,7 @@ public class CoordinatorRuntimeTest {
// Records have been written to the log.
long writeTimestamp = timer.time().milliseconds();
assertEquals(Arrays.asList(
assertEquals(List.of(
records(writeTimestamp, "record1"),
records(writeTimestamp, "record2")
), writer.entries(TP));