diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 7172fb2e899..ace0d0f7aac 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -143,7 +143,7 @@ public class CoordinatorRuntimeTest { } @Override - public void close() throws Exception {} + public void close() {} } /** @@ -181,7 +181,7 @@ public class CoordinatorRuntimeTest { } @Override - public void close() throws Exception { + public void close() { } } @@ -219,7 +219,7 @@ public class CoordinatorRuntimeTest { } @Override - public void close() throws Exception { } + public void close() { } } /** @@ -403,28 +403,18 @@ public class CoordinatorRuntimeTest { Set pendingRecords(long producerId) { TimelineHashSet pending = pendingRecords.get(producerId); if (pending == null) return Collections.emptySet(); - return Collections.unmodifiableSet( - pending.stream().map(record -> record.record).collect(Collectors.toSet()) - ); + return pending.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); } Set records() { - return Collections.unmodifiableSet( - records.stream().map(record -> record.record).collect(Collectors.toSet()) - ); + return records.stream().map(record -> record.record).collect(Collectors.toUnmodifiableSet()); } List fullRecords() { - return Collections.unmodifiableList( - records - .stream() - .sorted(Comparator.comparingLong(record -> record.offset)) - .collect(Collectors.toList()) - ); - } - - CoordinatorTimer timer() { - return timer; + return records + .stream() + .sorted(Comparator.comparingLong(record -> record.offset)) + .collect(Collectors.toList()); } } @@ -1081,11 +1071,11 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1") ); // Verify that the write is not committed yet. @@ -1096,17 +1086,17 @@ public class CoordinatorRuntimeTest { // The last committed offset does not change. assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records()); // Records have been written to the log. - assertEquals(Collections.singletonList( + assertEquals(List.of( records(timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record3"), "response2")); + state -> new CoordinatorResult<>(List.of("record3"), "response2")); // Verify that the write is not committed yet. assertFalse(write2.isDone()); @@ -1116,11 +1106,11 @@ public class CoordinatorRuntimeTest { // The last committed offset does not change. assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. assertEquals(Set.of("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record1", "record2"), records(timer.time().milliseconds(), "record3") ), writer.entries(TP)); @@ -1135,9 +1125,9 @@ public class CoordinatorRuntimeTest { // The state does not change. assertEquals(3L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record1", "record2"), records(timer.time().milliseconds(), "record3") ), writer.entries(TP)); @@ -1152,7 +1142,7 @@ public class CoordinatorRuntimeTest { // The last committed offset is updated. assertEquals(2L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Arrays.asList(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Commit write #2. writer.commit(TP, 3); @@ -1166,7 +1156,7 @@ public class CoordinatorRuntimeTest { // The last committed offset is updated. assertEquals(3L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(3L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #4 but without records. CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT, @@ -1175,7 +1165,7 @@ public class CoordinatorRuntimeTest { // It is completed immediately because the state is fully committed. assertTrue(write4.isDone()); assertEquals("response4", write4.get(5, TimeUnit.SECONDS)); - assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(3L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -1254,7 +1244,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Override the coordinator with a coordinator that throws // an exception when replay is called. @@ -1278,13 +1268,13 @@ public class CoordinatorRuntimeTest { // Write. It should fail. CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); assertFutureThrows(write, IllegalArgumentException.class); // Verify that the state has not changed. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -1314,28 +1304,28 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.coordinator.lastWrittenOffset()); assertEquals(0, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. It should succeed and be applied to the coordinator. runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); // Verify that the state has been updated. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records()); // Write #2. It should fail because the writer is configured to only // accept 1 write. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3", "record4", "record5"), "response2")); + state -> new CoordinatorResult<>(List.of("record3", "record4", "record5"), "response2")); assertFutureThrows(write2, KafkaException.class); // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().records()); } @@ -1366,11 +1356,11 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.coordinator.lastWrittenOffset()); assertEquals(0, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. We should get a TimeoutException because the HWM will not advance. CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); timer.advanceClock(4); @@ -1410,8 +1400,8 @@ public class CoordinatorRuntimeTest { List>> writes = runtime.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, state -> { int counter = cnt.getAndIncrement(); return new CoordinatorResult<>( - Collections.singletonList("record#" + counter), - Collections.singletonList("response#" + counter) + List.of("record#" + counter), + List.of("response#" + counter) ); }); @@ -1419,9 +1409,9 @@ public class CoordinatorRuntimeTest { assertEquals(1L, runtime.contextOrThrow(coordinator1).coordinator.lastWrittenOffset()); assertEquals(1L, runtime.contextOrThrow(coordinator2).coordinator.lastWrittenOffset()); - assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#0")), writer.entries(coordinator0)); - assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#1")), writer.entries(coordinator1)); - assertEquals(Collections.singletonList(records(timer.time().milliseconds(), "record#2")), writer.entries(coordinator2)); + assertEquals(List.of(records(timer.time().milliseconds(), "record#0")), writer.entries(coordinator0)); + assertEquals(List.of(records(timer.time().milliseconds(), "record#1")), writer.entries(coordinator1)); + assertEquals(List.of(records(timer.time().milliseconds(), "record#2")), writer.entries(coordinator2)); // Commit. writer.commit(coordinator0); @@ -1430,7 +1420,7 @@ public class CoordinatorRuntimeTest { // Verify. assertEquals( - Arrays.asList("response#0", "response#1", "response#2"), + List.of("response#0", "response#1", "response#2"), FutureUtils.combineFutures(writes, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS) ); } @@ -1494,7 +1484,7 @@ public class CoordinatorRuntimeTest { 100L, (short) 50, Duration.ofMillis(5000), - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response"), TXN_OFFSET_COMMIT_LATEST_VERSION ); @@ -1583,7 +1573,7 @@ public class CoordinatorRuntimeTest { 100L, (short) 50, Duration.ofMillis(5000), - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response"), TXN_OFFSET_COMMIT_LATEST_VERSION ); @@ -1625,7 +1615,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Transactional write #1. CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( @@ -1635,7 +1625,7 @@ public class CoordinatorRuntimeTest { 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"), TXN_OFFSET_COMMIT_LATEST_VERSION ); @@ -1647,14 +1637,14 @@ public class CoordinatorRuntimeTest { // The last committed offset does not change. assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. They are stored in // the pending set for now. assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( 100L )); // Records have been written to the log. - assertEquals(Collections.singletonList( + assertEquals(List.of( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); @@ -1677,7 +1667,7 @@ public class CoordinatorRuntimeTest { // The last committed offset does not change. assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. ControlRecordType expectedType; if (result == TransactionResult.COMMIT) { @@ -1691,7 +1681,7 @@ public class CoordinatorRuntimeTest { } // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(List.of( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2"), endTransactionMarker(100L, (short) 5, timer.time().milliseconds(), 10, expectedType) ), writer.entries(TP)); @@ -1737,7 +1727,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.coordinator.lastWrittenOffset()); assertEquals(0, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Complete #1. We should get a TimeoutException because the HWM will not advance. CompletableFuture timedOutCompletion = runtime.scheduleTransactionCompletion( @@ -1753,7 +1743,7 @@ public class CoordinatorRuntimeTest { // Verify that the state has been updated. assertEquals(1L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList()); // Advance clock to timeout Complete #1. timer.advanceClock(4); @@ -1764,7 +1754,7 @@ public class CoordinatorRuntimeTest { // operation timeouts because the record has been written to the log. assertEquals(1L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 1L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -1794,7 +1784,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0, ctx.coordinator.lastWrittenOffset()); assertEquals(0, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. It should succeed and be applied to the coordinator. runtime.scheduleTransactionalWriteOperation( @@ -1804,14 +1794,14 @@ public class CoordinatorRuntimeTest { 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"), TXN_OFFSET_COMMIT_LATEST_VERSION ); // Verify that the state has been updated. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); @@ -1830,7 +1820,7 @@ public class CoordinatorRuntimeTest { // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); } @@ -1860,7 +1850,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Override the coordinator with a coordinator that throws // an exception when replayEndTransactionMarker is called. @@ -1889,17 +1879,17 @@ public class CoordinatorRuntimeTest { 100L, (short) 5, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1"), TXN_OFFSET_COMMIT_LATEST_VERSION ); // Verify that the state has been updated. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Collections.singletonList( + assertEquals(List.of( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); @@ -1918,10 +1908,10 @@ public class CoordinatorRuntimeTest { // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Collections.singletonList( + assertEquals(List.of( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); } @@ -1955,11 +1945,11 @@ public class CoordinatorRuntimeTest { // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2")); + state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2")); // Commit write #1. writer.commit(TP, 2); @@ -2038,11 +2028,11 @@ public class CoordinatorRuntimeTest { // Write #1. runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); // Write #2. runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2")); + state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2")); // Commit write #1. writer.commit(TP, 2); @@ -2085,11 +2075,11 @@ public class CoordinatorRuntimeTest { // Writes runtime.scheduleWriteOperation("write#0", coordinator0, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record0"), "response0")); + state -> new CoordinatorResult<>(List.of("record0"), "response0")); runtime.scheduleWriteOperation("write#1", coordinator1, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1")); + state -> new CoordinatorResult<>(List.of("record1"), "response1")); runtime.scheduleWriteOperation("write#2", coordinator2, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2")); + state -> new CoordinatorResult<>(List.of("record2"), "response2")); // Commit writes. writer.commit(coordinator0); @@ -2103,7 +2093,7 @@ public class CoordinatorRuntimeTest { ); assertEquals( - Arrays.asList("record0", "record1", "record2"), + List.of("record0", "record1", "record2"), FutureUtils.combineFutures(responses, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS) ); } @@ -2136,11 +2126,11 @@ public class CoordinatorRuntimeTest { // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), "response2")); + state -> new CoordinatorResult<>(List.of("record3", "record4"), "response2")); // Writes are inflight. assertFalse(write1.isDone()); @@ -2151,7 +2141,7 @@ public class CoordinatorRuntimeTest { // Timer #1. This is never executed. ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true, - () -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), null)); + () -> new CoordinatorResult<>(List.of("record5", "record6"), null)); // The coordinator timer should have one pending task. assertEquals(1, ctx.timer.size()); @@ -2271,11 +2261,11 @@ public class CoordinatorRuntimeTest { // Timer #1. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), null)); + () -> new CoordinatorResult<>(List.of("record1", "record2"), null)); // Timer #2. ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), null)); + () -> new CoordinatorResult<>(List.of("record3", "record4"), null)); // The coordinator timer should have two pending tasks. assertEquals(2, ctx.timer.size()); @@ -2329,7 +2319,7 @@ public class CoordinatorRuntimeTest { // Timer #1. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); + () -> new CoordinatorResult<>(List.of("record1"), null)); // The coordinator timer should have one pending task. assertEquals(1, ctx.timer.size()); @@ -2342,14 +2332,14 @@ public class CoordinatorRuntimeTest { // Schedule a second timer with the same key. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); + () -> new CoordinatorResult<>(List.of("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); // Schedule a third timer with the same key. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Collections.singletonList("record3"), null)); + () -> new CoordinatorResult<>(List.of("record3"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); @@ -2404,7 +2394,7 @@ public class CoordinatorRuntimeTest { // Timer #1. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); + () -> new CoordinatorResult<>(List.of("record1"), null)); // The coordinator timer should have one pending task. assertEquals(1, ctx.timer.size()); @@ -2417,7 +2407,7 @@ public class CoordinatorRuntimeTest { // Schedule a second timer with the same key. ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, - () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); + () -> new CoordinatorResult<>(List.of("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); @@ -2819,8 +2809,8 @@ public class CoordinatorRuntimeTest { 1500, 30, 3000), - Arrays.asList(5L, 15L, 27L), - Arrays.asList(5L, 15L))) + List.of(5L, 15L, 27L), + List.of(5L, 15L))) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) @@ -2937,18 +2927,18 @@ public class CoordinatorRuntimeTest { // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") + state -> new CoordinatorResult<>(List.of("record1"), "response1") ); processor.poll(); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") + state -> new CoordinatorResult<>(List.of("record2"), "response2") ); processor.poll(); // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record1"), records(timer.time().milliseconds(), "record2") ), writer.entries(TP)); @@ -3007,18 +2997,18 @@ public class CoordinatorRuntimeTest { // Write#1. CompletableFuture write1 = runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") + state -> new CoordinatorResult<>(List.of("record1"), "response1") ); processor.poll(); // Write#2. CompletableFuture write2 = runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") + state -> new CoordinatorResult<>(List.of("record2"), "response2") ); processor.poll(); // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record1"), records(timer.time().milliseconds(), "record2") ), writer.entries(TP)); @@ -3092,7 +3082,7 @@ public class CoordinatorRuntimeTest { processor.poll(); // Records have been written to the log. - assertEquals(Collections.singletonList( + assertEquals(List.of( endTransactionMarker(100, (short) 50, timer.time().milliseconds(), 1, ControlRecordType.COMMIT) ), writer.entries(TP)); @@ -3149,7 +3139,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); int maxBatchSize = writer.config(TP).maxMessageSize(); assertTrue(maxBatchSize > MIN_BUFFER_SIZE); @@ -3200,7 +3190,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -3229,8 +3219,8 @@ public class CoordinatorRuntimeTest { // Verify the state. Records are replayed but no batch written. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)) ), ctx.coordinator.coordinator().fullRecords()); @@ -3247,8 +3237,8 @@ public class CoordinatorRuntimeTest { // Verify the state. Records are replayed but no batch written. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) @@ -3268,14 +3258,14 @@ public class CoordinatorRuntimeTest { // got flushed with all the records but the new one from #3. assertEquals(3L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Collections.singletonList( + assertEquals(List.of( records(timer.time().milliseconds(), records.subList(0, 3)) ), writer.entries(TP)); @@ -3285,14 +3275,14 @@ public class CoordinatorRuntimeTest { // Verify the state. The pending batch is flushed. assertEquals(4L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds() - 11, records.subList(0, 3)), records(timer.time().milliseconds() - 11, records.subList(3, 4)) ), writer.entries(TP)); @@ -3334,7 +3324,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -3385,7 +3375,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -3415,8 +3405,8 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) @@ -3438,7 +3428,7 @@ public class CoordinatorRuntimeTest { // Verify the state. The state should be reverted to the initial state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords()); assertEquals(Collections.emptyList(), writer.entries(TP)); } @@ -3470,7 +3460,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Override the coordinator with a coordinator that throws @@ -3518,8 +3508,8 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Collections.singletonList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)) ), ctx.coordinator.coordinator().fullRecords()); assertEquals(Collections.emptyList(), writer.entries(TP)); @@ -3535,7 +3525,7 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords()); assertEquals(Collections.emptyList(), writer.entries(TP)); } @@ -3567,12 +3557,12 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Write #1 with one record. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), - state -> new CoordinatorResult<>(Collections.singletonList("record#1"), "response1") + state -> new CoordinatorResult<>(List.of("record#1"), "response1") ); // Verify that the write is not committed yet. @@ -3581,7 +3571,7 @@ public class CoordinatorRuntimeTest { // Verify the state. Records are replayed but no batch written. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Set.of("record#1"), ctx.coordinator.coordinator().records()); assertEquals(Collections.emptyList(), writer.entries(TP)); @@ -3594,7 +3584,7 @@ public class CoordinatorRuntimeTest { 100L, (short) 50, Duration.ofMillis(20), - state -> new CoordinatorResult<>(Collections.singletonList("record#2"), "response2"), + state -> new CoordinatorResult<>(List.of("record#2"), "response2"), TXN_OFFSET_COMMIT_LATEST_VERSION ); @@ -3605,17 +3595,17 @@ public class CoordinatorRuntimeTest { // written to the log. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record#2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Set.of("record#1"), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2") ), writer.entries(TP)); // Write #3 with one record. CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), - state -> new CoordinatorResult<>(Collections.singletonList("record#3"), "response3") + state -> new CoordinatorResult<>(List.of("record#3"), "response3") ); // Verify that the write is not committed yet. @@ -3624,10 +3614,10 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Set.of("record#2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Set.of("record#1", "record#3"), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2") ), writer.entries(TP)); @@ -3649,10 +3639,10 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(4L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Set.of("record#1", "record#2", "record#3"), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds(), "record#1"), transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2"), records(timer.time().milliseconds(), "record#3"), @@ -3710,7 +3700,7 @@ public class CoordinatorRuntimeTest { assertEquals(ACTIVE, ctx.state); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Keep a reference to the current coordinator. @@ -3823,7 +3813,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(1L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(2L), ctx.coordinator.snapshotRegistry().epochsList()); // Schedule a write operation that does not generate any records. CompletableFuture write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), @@ -3838,7 +3828,7 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(2L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(2L), ctx.coordinator.snapshotRegistry().epochsList()); // The write operation should be completed. assertEquals("response1", write.get(5, TimeUnit.SECONDS)); @@ -3871,7 +3861,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -3908,14 +3898,14 @@ public class CoordinatorRuntimeTest { // the first three records. The 4th one is pending. assertEquals(3L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Collections.singletonList( + assertEquals(List.of( records(timer.time().milliseconds(), records.subList(0, 3)) ), writer.entries(TP)); @@ -3932,14 +3922,14 @@ public class CoordinatorRuntimeTest { assertNull(ctx.currentBatch); assertEquals(4L, ctx.coordinator.lastWrittenOffset()); assertEquals(3L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(timer.time().milliseconds() - 11, records.subList(0, 3)), records(timer.time().milliseconds() - 11, records.subList(3, 4)) ), writer.entries(TP)); @@ -3979,7 +3969,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -4007,8 +3997,8 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) @@ -4020,7 +4010,7 @@ public class CoordinatorRuntimeTest { // Note that the batch will fail only when the batch is written because the // MemoryBatchBuilder always accept one record. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), - state -> new CoordinatorResult<>(Collections.singletonList(record), "write#2", null, true, false) + state -> new CoordinatorResult<>(List.of(record), "write#2", null, true, false) ); // Advance past the linger time to flush the pending batch. @@ -4035,13 +4025,13 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(3L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Collections.singletonList( + assertEquals(List.of( records(timer.time().milliseconds() - 11, records.subList(0, 3)) ), writer.entries(TP)); } @@ -4074,7 +4064,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertNull(ctx.currentBatch); // Get the max batch size. @@ -4104,8 +4094,8 @@ public class CoordinatorRuntimeTest { // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); - assertEquals(Arrays.asList( + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) @@ -4127,7 +4117,7 @@ public class CoordinatorRuntimeTest { // Verify the state. The state should be reverted to the initial state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords()); assertEquals(Collections.emptyList(), writer.entries(TP)); } @@ -4203,13 +4193,13 @@ public class CoordinatorRuntimeTest { // got flushed with all the records but the new one from #3. assertEquals(3L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList( + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Collections.singletonList( + assertEquals(List.of( records(firstBatchTimestamp, records.subList(0, 3)) ), writer.entries(TP)); verify(runtimeMetrics, times(1)).recordFlushTime(10); @@ -4220,13 +4210,13 @@ public class CoordinatorRuntimeTest { // Verify the state. The pending batch is flushed. assertEquals(4L, ctx.coordinator.lastWrittenOffset()); assertEquals(0L, ctx.coordinator.lastCommittedOffset()); - assertEquals(Arrays.asList( + assertEquals(List.of( new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) ), ctx.coordinator.coordinator().fullRecords()); - assertEquals(Arrays.asList( + assertEquals(List.of( records(secondBatchTimestamp, records.subList(0, 3)), records(secondBatchTimestamp, records.subList(3, 4)) ), writer.entries(TP)); @@ -4273,11 +4263,11 @@ public class CoordinatorRuntimeTest { // write#1 will be committed and update the high watermark. Record time spent in purgatory. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, writeTimeout, - state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") + state -> new CoordinatorResult<>(List.of("record1"), "response1") ); // write#2 will time out sitting in the purgatory. Record time spent in purgatory. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, writeTimeout, - state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") + state -> new CoordinatorResult<>(List.of("record2"), "response2") ); // write#3 will error while appending. Does not spend time in purgatory. CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, writeTimeout, @@ -4295,7 +4285,7 @@ public class CoordinatorRuntimeTest { // Records have been written to the log. long writeTimestamp = timer.time().milliseconds(); - assertEquals(Arrays.asList( + assertEquals(List.of( records(writeTimestamp, "record1"), records(writeTimestamp, "record2") ), writer.entries(TP));