mirror of https://github.com/apache/kafka.git
MINOR: Removing incorrect multi threaded state transition tests (#20436)
These tests were written while finalizing approach for making inflight state class thread safe but later approach changed and the lock is now always required by SharePartition to change inflight state. Hence these tests are incorrect and do not add any value. Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
e6f3efc914
commit
7eeb5c8344
|
@ -56,7 +56,6 @@ import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
|
|||
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
||||
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
||||
import org.apache.kafka.server.share.fetch.DeliveryCountOps;
|
||||
import org.apache.kafka.server.share.fetch.InFlightState;
|
||||
import org.apache.kafka.server.share.fetch.RecordState;
|
||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||
|
@ -91,7 +90,6 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -8535,68 +8533,6 @@ public class SharePartitionTest {
|
|||
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inFlightStateRollbackAndArchiveStateTransition() throws InterruptedException {
|
||||
InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID);
|
||||
|
||||
inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
|
||||
assertTrue(inFlightState.hasOngoingStateTransition());
|
||||
|
||||
// We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same
|
||||
// time when we have a call to completeStateTransition with false commit value, we get a call to ARCHIVE the record.
|
||||
// No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED.
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
List<Callable<Void>> callables = List.of(
|
||||
() -> {
|
||||
inFlightState.archive();
|
||||
return null;
|
||||
},
|
||||
() -> {
|
||||
inFlightState.completeStateTransition(false);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
executorService.invokeAll(callables);
|
||||
} finally {
|
||||
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
|
||||
executorService.shutdown();
|
||||
}
|
||||
assertEquals(RecordState.ARCHIVED, inFlightState.state());
|
||||
assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inFlightStateCommitSuccessAndArchiveStateTransition() throws InterruptedException {
|
||||
InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID);
|
||||
|
||||
inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
|
||||
assertTrue(inFlightState.hasOngoingStateTransition());
|
||||
|
||||
// We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same
|
||||
// time when we have a call to completeStateTransition with true commit value, we get a call to ARCHIVE the record.
|
||||
// No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED.
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
List<Callable<Void>> callables = List.of(
|
||||
() -> {
|
||||
inFlightState.archive();
|
||||
return null;
|
||||
},
|
||||
() -> {
|
||||
inFlightState.completeStateTransition(true);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
executorService.invokeAll(callables);
|
||||
} finally {
|
||||
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
|
||||
executorService.shutdown();
|
||||
}
|
||||
assertEquals(RecordState.ARCHIVED, inFlightState.state());
|
||||
assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws InterruptedException {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
|
|
Loading…
Reference in New Issue