diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 418d58376cc..b82829e1d62 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -2815,9 +2815,8 @@ public class CoordinatorRuntimeTest { assertTrue(write1.isDone()); assertTrue(write2.isDone()); - // All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer. - assertEquals(2, timer.size()); - timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); + // All timer tasks have been cancelled. Hence,they have been removed in MockTimer. + assertEquals(0, timer.size()); } @Test @@ -2885,9 +2884,8 @@ public class CoordinatorRuntimeTest { assertEquals(1, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset()); assertTrue(write1.isDone()); - // All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer. - assertEquals(1, timer.size()); - timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); + // All timer tasks have been cancelled. Hence, they have been removed in MockTimer. + assertEquals(0, timer.size()); } @Test diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 465bce6de6a..da35cb0f428 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -67,8 +67,7 @@ import org.apache.kafka.server.share.persister.WriteShareGroupStateResult; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.util.FutureUtils; -import org.apache.kafka.server.util.timer.SystemTimer; -import org.apache.kafka.server.util.timer.SystemTimerReaper; +import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; @@ -117,7 +116,7 @@ public class SharePartitionTest { private static final Time MOCK_TIME = new MockTime(); private static final short MAX_IN_FLIGHT_MESSAGES = 200; private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100; - private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 300; + private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 120; private static final int BATCH_SIZE = 500; private static final int DEFAULT_FETCH_OFFSET = 0; private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE; @@ -129,8 +128,7 @@ public class SharePartitionTest { @BeforeEach public void setUp() { kafka.utils.TestUtils.clearYammerMetrics(); - mockTimer = new SystemTimerReaper("share-group-lock-timeout-test-reaper", - new SystemTimer("share-group-lock-test-timeout")); + mockTimer = new MockTimer(); sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition()); } @@ -2925,6 +2923,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -2951,6 +2950,7 @@ public class SharePartitionTest { assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 @@ -2985,6 +2985,7 @@ public class SharePartitionTest { assertEquals(2, sharePartition.timer().size()); // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && @@ -3012,6 +3013,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && @@ -3128,6 +3130,7 @@ public class SharePartitionTest { // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1. // Since, other records have been acknowledged. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 1 && @@ -3155,6 +3158,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && @@ -3179,6 +3183,7 @@ public class SharePartitionTest { assertEquals(3, sharePartition.timer().size()); // Allowing acquisition lock to expire for the acquired subset batch. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); @@ -3259,6 +3264,7 @@ public class SharePartitionTest { assertEquals(3, sharePartition.timer().size()); // Allowing acquisition lock to expire for the offsets that have not been acknowledged yet. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); @@ -3321,6 +3327,7 @@ public class SharePartitionTest { assertEquals(2, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && @@ -3338,6 +3345,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire to archive the records that reach max delivery count. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && @@ -3363,6 +3371,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && @@ -3386,6 +3395,7 @@ public class SharePartitionTest { assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask()); // Allowing acquisition lock to expire to archive the records that reach max delivery count. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); @@ -3436,6 +3446,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && @@ -3450,6 +3461,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire to archive the records that reach max delivery count. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && // After the second failed attempt to acknowledge the record batch successfully, the record batch is archived. @@ -3473,6 +3485,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && @@ -3531,6 +3544,7 @@ public class SharePartitionTest { assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { // Check cached state. @@ -3576,6 +3590,7 @@ public class SharePartitionTest { assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && @@ -3616,6 +3631,7 @@ public class SharePartitionTest { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); @@ -4184,7 +4200,6 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) .build(); MemoryRecords records1 = memoryRecords(2, 5); @@ -4977,6 +4992,7 @@ public class SharePartitionTest { assertEquals(7, sharePartition.cachedState().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); @@ -5035,6 +5051,7 @@ public class SharePartitionTest { assertEquals(2, sharePartition.cachedState().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) && sharePartition.cachedState().get(5L).batchState() == RecordState.ARCHIVED && @@ -5063,6 +5080,7 @@ public class SharePartitionTest { assertEquals(2, sharePartition.cachedState().size()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); @@ -5088,7 +5106,6 @@ public class SharePartitionTest { Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs); SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withGroupConfigManager(groupConfigManager).build(); SharePartition.AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); @@ -5111,7 +5128,6 @@ public class SharePartitionTest { .thenReturn(expectedDurationMs2); SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withGroupConfigManager(groupConfigManager).build(); SharePartition.AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); @@ -5257,6 +5273,7 @@ public class SharePartitionTest { assertNotNull(sharePartition.cachedState().get(2L).batchAcquisitionLockTimeoutTask()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 7 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 7 && sharePartition.endOffset() == 7, @@ -5306,6 +5323,7 @@ public class SharePartitionTest { assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask()); // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 3 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 3 && sharePartition.endOffset() == 3, diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java index 8de1b91c32d..2bdbc9cb080 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java @@ -81,6 +81,9 @@ public class MockTimer implements Timer { } public int size() { + synchronized (taskQueue) { + taskQueue.removeIf(TimerTaskEntry::cancelled); + } return taskQueue.size(); }