KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)

Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.

Reviewers: Guozhang Wang <wangguoz@gmail.com>,  Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Lifei Chen 2019-05-30 23:33:37 +08:00 committed by Bill Bejeck
parent 0779740f9b
commit ed659fe73d
2 changed files with 64 additions and 46 deletions

View File

@ -334,7 +334,7 @@ public class TaskManager {
log.trace("Resuming partitions {}", assignment);
consumer.resume(assignment);
assignStandbyPartitions();
return true;
return standby.allTasksRunning();
}
return false;
}

View File

@ -263,7 +263,7 @@ public class TaskManagerTest {
@Test
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();
EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
active.addNewTask(EasyMock.same(streamTask));
replay();
@ -276,7 +276,7 @@ public class TaskManagerTest {
@Test
public void shouldNotAddResumedActiveTasks() {
checkOrder(active, true);
EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@ -289,7 +289,7 @@ public class TaskManagerTest {
@Test
public void shouldAddNonResumedStandbyTasks() {
mockStandbyTaskExpectations();
EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
standby.addNewTask(EasyMock.same(standbyTask));
replay();
@ -302,7 +302,7 @@ public class TaskManagerTest {
@Test
public void shouldNotAddResumedStandbyTasks() {
checkOrder(active, true);
EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
replay();
taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
@ -316,7 +316,7 @@ public class TaskManagerTest {
public void shouldPauseActivePartitions() {
mockSingleActiveTask();
consumer.pause(taskId0Partitions);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@ -326,7 +326,7 @@ public class TaskManagerTest {
@Test
public void shouldSuspendActiveTasks() {
EasyMock.expect(active.suspend()).andReturn(null);
expect(active.suspend()).andReturn(null);
replay();
taskManager.suspendTasksAndState();
@ -335,7 +335,7 @@ public class TaskManagerTest {
@Test
public void shouldSuspendStandbyTasks() {
EasyMock.expect(standby.suspend()).andReturn(null);
expect(standby.suspend()).andReturn(null);
replay();
taskManager.suspendTasksAndState();
@ -345,7 +345,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnSuspend() {
restoreConsumer.unsubscribe();
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.suspendTasksAndState();
@ -354,9 +354,9 @@ public class TaskManagerTest {
@Test
public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
EasyMock.expectLastCall();
expect(active.suspend()).andReturn(new RuntimeException(""));
expect(standby.suspend()).andReturn(new RuntimeException(""));
expectLastCall();
restoreConsumer.unsubscribe();
replay();
@ -372,7 +372,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseActiveTasksOnShutdown() {
active.close(true);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.shutdown(true);
@ -382,7 +382,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseStandbyTasksOnShutdown() {
standby.close(false);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.shutdown(false);
@ -392,7 +392,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnShutdown() {
restoreConsumer.unsubscribe();
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.shutdown(true);
@ -402,7 +402,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewActiveTasks() {
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -412,7 +412,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewStandbyTasks() {
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -421,9 +421,9 @@ public class TaskManagerTest {
@Test
public void shouldRestoreStateFromChangeLogReader() {
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
active.updateRestored(taskId0Partitions);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -432,13 +432,13 @@ public class TaskManagerTest {
@Test
public void shouldResumeRestoredPartitions() {
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
EasyMock.expect(active.allTasksRunning()).andReturn(true);
EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
expect(active.allTasksRunning()).andReturn(true);
expect(consumer.assignment()).andReturn(taskId0Partitions);
expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
consumer.resume(taskId0Partitions);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -450,13 +450,31 @@ public class TaskManagerTest {
mockAssignStandbyPartitions(1L);
replay();
assertTrue(taskManager.updateNewAndRestoringTasks());
taskManager.updateNewAndRestoringTasks();
verify(restoreConsumer);
}
@Test
public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() {
mockAssignStandbyPartitions(1L);
expect(standby.allTasksRunning()).andReturn(true);
replay();
assertTrue(taskManager.updateNewAndRestoringTasks());
}
@Test
public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() {
mockAssignStandbyPartitions(1L);
expect(standby.allTasksRunning()).andReturn(false);
replay();
assertFalse(taskManager.updateNewAndRestoringTasks());
}
@Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
EasyMock.expect(active.allTasksRunning()).andReturn(false);
expect(active.allTasksRunning()).andReturn(false);
replay();
assertFalse(taskManager.updateNewAndRestoringTasks());
@ -466,7 +484,7 @@ public class TaskManagerTest {
public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
mockAssignStandbyPartitions(1L);
restoreConsumer.seek(t1p0, 1L);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -477,7 +495,7 @@ public class TaskManagerTest {
public void shouldSeekToBeginningIfOffsetIsLessThan0() {
mockAssignStandbyPartitions(-1L);
restoreConsumer.seekToBeginning(taskId0Partitions);
EasyMock.expectLastCall();
expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@ -486,8 +504,8 @@ public class TaskManagerTest {
@Test
public void shouldCommitActiveAndStandbyTasks() {
EasyMock.expect(active.commit()).andReturn(1);
EasyMock.expect(standby.commit()).andReturn(2);
expect(active.commit()).andReturn(1);
expect(standby.commit()).andReturn(2);
replay();
@ -500,7 +518,7 @@ public class TaskManagerTest {
// upgrade to strict mock to ensure no calls
checkOrder(standby, true);
active.commit();
EasyMock.expectLastCall().andThrow(new RuntimeException(""));
expectLastCall().andThrow(new RuntimeException(""));
replay();
try {
@ -514,7 +532,7 @@ public class TaskManagerTest {
@Test
public void shouldPropagateExceptionFromStandbyCommit() {
EasyMock.expect(standby.commit()).andThrow(new RuntimeException(""));
expect(standby.commit()).andThrow(new RuntimeException(""));
replay();
try {
@ -534,8 +552,8 @@ public class TaskManagerTest {
futureDeletedRecords.complete(null);
EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
replay();
taskManager.maybePurgeCommitedRecords();
@ -549,8 +567,8 @@ public class TaskManagerTest {
final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords));
EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
replay();
taskManager.maybePurgeCommitedRecords();
@ -567,8 +585,8 @@ public class TaskManagerTest {
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
replay();
taskManager.maybePurgeCommitedRecords();
@ -578,7 +596,7 @@ public class TaskManagerTest {
@Test
public void shouldMaybeCommitActiveTasks() {
EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5);
expect(active.maybeCommitPerUserRequested()).andReturn(5);
replay();
assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5));
@ -587,7 +605,7 @@ public class TaskManagerTest {
@Test
public void shouldProcessActiveTasks() {
EasyMock.expect(active.process(0L)).andReturn(10);
expect(active.process(0L)).andReturn(10);
replay();
assertThat(taskManager.process(0L), equalTo(10));
@ -596,7 +614,7 @@ public class TaskManagerTest {
@Test
public void shouldPunctuateActiveTasks() {
EasyMock.expect(active.punctuate()).andReturn(20);
expect(active.punctuate()).andReturn(20);
replay();
assertThat(taskManager.punctuate(), equalTo(20));
@ -605,7 +623,7 @@ public class TaskManagerTest {
@Test
public void shouldNotResumeConsumptionUntilAllStoresRestored() {
EasyMock.expect(active.allTasksRunning()).andReturn(false);
expect(active.allTasksRunning()).andReturn(false);
final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
taskManager.setConsumer(consumer);
EasyMock.replay(active, consumer);
@ -637,12 +655,12 @@ public class TaskManagerTest {
private void mockAssignStandbyPartitions(final long offset) {
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
EasyMock.expect(active.allTasksRunning()).andReturn(true);
EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
expect(active.allTasksRunning()).andReturn(true);
expect(standby.running()).andReturn(Collections.singletonList(task));
expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
restoreConsumer.assign(taskId0Partitions);
EasyMock.expectLastCall();
expectLastCall();
EasyMock.replay(task);
}