MINOR: Standby task commit needed when offsets updated (#8146)

This is a minor fix of a regression introduced in the refactoring PR: in current trunk standbyTask#commitNeeded always return false, which would cause standby tasks to never be committed until closed. To go back to the old behavior we would return true when new data has been applied and offsets being updated.

Reviewers: Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Guozhang Wang 2020-02-21 12:08:00 -08:00 committed by GitHub
parent 84c4025fdd
commit 003dce5d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 1 deletions

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -42,6 +43,8 @@ public class StandbyTask extends AbstractTask implements Task {
private final Sensor closeTaskSensor;
private final InternalProcessorContext processorContext;
private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
/**
* @param id the ID of this task
* @param partitions input topic partitions, used for thread metadata only
@ -125,6 +128,8 @@ public class StandbyTask extends AbstractTask implements Task {
// and the state current offset would be used to checkpoint
stateMgr.checkpoint(Collections.emptyMap());
offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
log.info("Committed");
break;
@ -188,7 +193,8 @@ public class StandbyTask extends AbstractTask implements Task {
@Override
public boolean commitNeeded() {
return false;
// we can commit if the store's offset has changed since last commit
return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
}
@Override

View File

@ -57,7 +57,9 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
@ -185,6 +187,7 @@ public class StandbyTaskTest {
stateManager.flush();
EasyMock.expectLastCall();
stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L));
EasyMock.replay(stateManager);
task = createStandbyTask();
@ -259,6 +262,7 @@ public class StandbyTaskTest {
EasyMock.expectLastCall();
stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
EasyMock.expectLastCall();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L));
EasyMock.replay(stateManager);
final MetricName metricName = setupCloseTaskMetric();
@ -274,10 +278,39 @@ public class StandbyTaskTest {
EasyMock.verify(stateManager);
}
@Test
public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
EasyMock.expect(stateManager.changelogOffsets())
.andReturn(Collections.singletonMap(partition, 50L))
.andReturn(Collections.singletonMap(partition, 50L))
.andReturn(Collections.singletonMap(partition, 60L));
stateManager.flush();
EasyMock.expectLastCall();
stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
EasyMock.expectLastCall();
EasyMock.replay(stateManager);
task = createStandbyTask();
task.initializeIfNeeded();
assertTrue(task.commitNeeded());
task.commit();
// do not need to commit if there's no update
assertFalse(task.commitNeeded());
// could commit if the offset advanced
assertTrue(task.commitNeeded());
EasyMock.verify(stateManager);
}
@Test
public void shouldThrowOnCloseCleanError() {
stateManager.close();
EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L));
EasyMock.replay(stateManager);
final MetricName metricName = setupCloseTaskMetric();