mirror of https://github.com/apache/kafka.git
KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15044)
Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
d59d613258
commit
19727f8d51
|
@ -408,6 +408,7 @@
|
|||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="kafka.utils" />
|
||||
<allow pkg="org.apache.zookeeper" />
|
||||
<allow pkg="org.apache.log4j" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
|
|
@ -230,7 +230,7 @@
|
|||
|
||||
<!-- Streams tests -->
|
||||
<suppress checks="ClassFanOutComplexity"
|
||||
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest).java"/>
|
||||
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/>
|
||||
|
||||
<suppress checks="MethodLength"
|
||||
files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest|StreamStreamJoinIntegrationTest).java"/>
|
||||
|
|
|
@ -361,7 +361,7 @@ public class RecordCollectorImpl implements RecordCollector {
|
|||
// transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
|
||||
// tasks had any data in the current transaction and therefore there is no need to commit or abort it.
|
||||
|
||||
checkForException();
|
||||
close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -377,6 +377,11 @@ public class RecordCollectorImpl implements RecordCollector {
|
|||
streamsProducer.abortTransaction();
|
||||
}
|
||||
|
||||
close();
|
||||
}
|
||||
|
||||
private void close() {
|
||||
offsets.clear();
|
||||
checkForException();
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
|||
import org.apache.kafka.common.IsolationLevel;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
|
@ -35,12 +38,18 @@ import org.apache.kafka.streams.KeyValue;
|
|||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Transformer;
|
||||
import org.apache.kafka.streams.kstream.TransformerSupplier;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.StateRestoreListener;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.processor.internals.StreamThread;
|
||||
import org.apache.kafka.streams.query.QueryResult;
|
||||
import org.apache.kafka.streams.query.RangeQuery;
|
||||
|
@ -70,6 +79,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -82,14 +92,22 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived;
|
||||
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
|
||||
import static org.apache.kafka.test.TestUtils.consumerConfig;
|
||||
import static org.apache.kafka.test.TestUtils.waitForCondition;
|
||||
|
@ -789,6 +807,152 @@ public class EosIntegrationTest {
|
|||
verifyOffsetsAreInCheckpoint(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception {
|
||||
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception {
|
||||
if (!processingThreadsEnabled) {
|
||||
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean stateUpdaterEnabled) throws Exception {
|
||||
if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
|
||||
return;
|
||||
}
|
||||
final Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
|
||||
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
|
||||
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
|
||||
streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
|
||||
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
|
||||
final String stateStoreName = "stateStore";
|
||||
|
||||
purgeLocalStreamsState(streamsConfiguration);
|
||||
|
||||
final int startKey = 1;
|
||||
final int endKey = 30001;
|
||||
final List<KeyValue<Integer, Integer>> recordBatch1 = IntStream.range(startKey, endKey - 1000).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
|
||||
recordBatch1,
|
||||
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
|
||||
IntegerSerializer.class,
|
||||
IntegerSerializer.class),
|
||||
CLUSTER.time);
|
||||
|
||||
final StoreBuilder<KeyValueStore<Integer, String>> stateStore = Stores.keyValueStoreBuilder(
|
||||
Stores.persistentKeyValueStore(stateStoreName),
|
||||
Serdes.Integer(),
|
||||
Serdes.String()).withCachingEnabled();
|
||||
|
||||
final int partitionToVerify = 0;
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean throwException = new AtomicBoolean(false);
|
||||
final TaskId task00 = new TaskId(0, partitionToVerify);
|
||||
final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
|
||||
final Topology topology = new Topology();
|
||||
topology
|
||||
.addSource("source", MULTI_PARTITION_INPUT_TOPIC)
|
||||
.addProcessor("processor", () -> new Processor<Integer, String, Integer, String>() {
|
||||
KeyValueStore<Integer, String> stateStore;
|
||||
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context;
|
||||
|
||||
@Override
|
||||
public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context) {
|
||||
Processor.super.init(context);
|
||||
this.context = context;
|
||||
stateStore = context.getStateStore(stateStoreName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<Integer, String> record) {
|
||||
context.recordMetadata().ifPresent(recordMetadata -> {
|
||||
if (recordMetadata.partition() == partitionToVerify) {
|
||||
if (throwException.compareAndSet(true, false)) {
|
||||
throw new TaskCorruptedException(Collections.singleton(task00));
|
||||
}
|
||||
stateStore.put(record.key(), record.value());
|
||||
} else {
|
||||
stateStore.put(record.key(), record.value());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Processor.super.close();
|
||||
}
|
||||
}, "source")
|
||||
.addStateStore(stateStore, "processor");
|
||||
|
||||
final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
|
||||
kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
|
||||
@Override
|
||||
public void onRestoreStart(final TopicPartition topicPartition,
|
||||
final String storeName,
|
||||
final long startingOffset,
|
||||
final long endingOffset) {}
|
||||
@Override
|
||||
public void onBatchRestored(final TopicPartition topicPartition,
|
||||
final String storeName,
|
||||
final long batchEndOffset,
|
||||
final long numRestored) {
|
||||
if (topicPartition.partition() == 0) {
|
||||
restoredOffsetsForPartition0.set(batchEndOffset);
|
||||
if (batchEndOffset > 100) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onRestoreEnd(final TopicPartition topicPartition,
|
||||
final String storeName,
|
||||
final long totalRestored) {}
|
||||
});
|
||||
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60));
|
||||
ensureCommittedRecordsInTopicPartition(
|
||||
applicationId + "-" + stateStoreName + "-changelog",
|
||||
partitionToVerify,
|
||||
2000,
|
||||
IntegerDeserializer.class,
|
||||
IntegerDeserializer.class
|
||||
);
|
||||
throwException.set(true);
|
||||
final List<KeyValue<Integer, Integer>> recordBatch2 = IntStream.range(endKey - 1000, endKey).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
|
||||
recordBatch2,
|
||||
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
|
||||
IntegerSerializer.class,
|
||||
IntegerSerializer.class),
|
||||
CLUSTER.time);
|
||||
latch.await();
|
||||
kafkaStreams.close();
|
||||
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
|
||||
|
||||
final File checkpointFile = Paths.get(
|
||||
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
|
||||
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
|
||||
task00.toString(),
|
||||
".checkpoint"
|
||||
).toFile();
|
||||
assertTrue(checkpointFile.exists());
|
||||
final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
|
||||
assertEquals(
|
||||
Long.valueOf(restoredOffsetsForPartition0.get()),
|
||||
new ArrayList<>(checkpoints.values()).get(0)
|
||||
);
|
||||
}
|
||||
|
||||
private void verifyOffsetsAreInCheckpoint(final int partition) throws IOException {
|
||||
final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
|
||||
|
||||
|
@ -989,13 +1153,21 @@ public class EosIntegrationTest {
|
|||
private List<KeyValue<Long, Long>> readResult(final String topic,
|
||||
final int numberOfRecords,
|
||||
final String groupId) throws Exception {
|
||||
return readResult(topic, numberOfRecords, LongDeserializer.class, LongDeserializer.class, groupId);
|
||||
}
|
||||
|
||||
private <K, V> List<KeyValue<K, V>> readResult(final String topic,
|
||||
final int numberOfRecords,
|
||||
final Class<? extends Deserializer<K>> keyDeserializer,
|
||||
final Class<? extends Deserializer<V>> valueDeserializer,
|
||||
final String groupId) throws Exception {
|
||||
if (groupId != null) {
|
||||
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
|
||||
TestUtils.consumerConfig(
|
||||
CLUSTER.bootstrapServers(),
|
||||
groupId,
|
||||
LongDeserializer.class,
|
||||
LongDeserializer.class,
|
||||
keyDeserializer,
|
||||
valueDeserializer,
|
||||
Utils.mkProperties(Collections.singletonMap(
|
||||
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
|
||||
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
|
||||
|
@ -1006,12 +1178,53 @@ public class EosIntegrationTest {
|
|||
|
||||
// read uncommitted
|
||||
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
|
||||
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
|
||||
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), keyDeserializer, valueDeserializer),
|
||||
topic,
|
||||
numberOfRecords
|
||||
);
|
||||
}
|
||||
|
||||
private <K, V> void ensureCommittedRecordsInTopicPartition(final String topic,
|
||||
final int partition,
|
||||
final int numberOfRecords,
|
||||
final Class<? extends Deserializer<K>> keyDeserializer,
|
||||
final Class<? extends Deserializer<V>> valueDeserializer) throws Exception {
|
||||
final long timeoutMs = 2 * DEFAULT_TIMEOUT;
|
||||
final int maxTries = 10;
|
||||
final long deadline = System.currentTimeMillis() + timeoutMs;
|
||||
int tries = 0;
|
||||
while (true) {
|
||||
final List<ConsumerRecord<K, V>> consumerRecords = waitUntilMinRecordsReceived(
|
||||
TestUtils.consumerConfig(
|
||||
CLUSTER.bootstrapServers(),
|
||||
CONSUMER_GROUP_ID,
|
||||
keyDeserializer,
|
||||
valueDeserializer,
|
||||
Utils.mkProperties(Collections.singletonMap(
|
||||
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
|
||||
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
|
||||
)
|
||||
),
|
||||
topic,
|
||||
numberOfRecords,
|
||||
timeoutMs
|
||||
);
|
||||
++tries;
|
||||
if (consumerRecords.stream().anyMatch(record -> record.partition() == partition)) {
|
||||
return;
|
||||
}
|
||||
if (tries >= maxTries) {
|
||||
throw new AssertionError("No committed records in topic " + topic
|
||||
+ ", partition " + partition + " after " + maxTries + " retries.");
|
||||
}
|
||||
final long now = System.currentTimeMillis();
|
||||
if (now > deadline) {
|
||||
throw new AssertionError("No committed records in topic " + topic
|
||||
+ ", partition " + partition + " after " + timeoutMs + " ms.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
|
||||
final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());
|
||||
|
||||
|
|
|
@ -1313,7 +1313,6 @@ public class IntegrationTestUtils {
|
|||
final int maxMessages) {
|
||||
final List<ConsumerRecord<K, V>> consumerRecords;
|
||||
consumer.subscribe(singletonList(topic));
|
||||
System.out.println("Got assignment:" + consumer.assignment());
|
||||
final int pollIntervalMs = 100;
|
||||
consumerRecords = new ArrayList<>();
|
||||
int totalPollTimeMs = 0;
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.TaskId;
|
|||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockClientSupplier;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -90,6 +91,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -779,6 +781,66 @@ public class RecordCollectorTest {
|
|||
collector.flush();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldClearOffsetsOnCloseClean() {
|
||||
shouldClearOffsetsOnClose(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldClearOffsetsOnCloseDirty() {
|
||||
shouldClearOffsetsOnClose(false);
|
||||
}
|
||||
|
||||
private void shouldClearOffsetsOnClose(final boolean clean) {
|
||||
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
|
||||
when(streamsProducer.eosEnabled()).thenReturn(true);
|
||||
final long offset = 1234L;
|
||||
final RecordMetadata metadata = new RecordMetadata(
|
||||
new TopicPartition(topic, 0),
|
||||
offset,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
1
|
||||
);
|
||||
when(streamsProducer.send(any(), any())).thenAnswer(invocation -> {
|
||||
((Callback) invocation.getArgument(1)).onCompletion(metadata, null);
|
||||
return null;
|
||||
});
|
||||
final ProcessorTopology topology = mock(ProcessorTopology.class);
|
||||
|
||||
final RecordCollector collector = new RecordCollectorImpl(
|
||||
logContext,
|
||||
taskId,
|
||||
streamsProducer,
|
||||
productionExceptionHandler,
|
||||
streamsMetrics,
|
||||
topology
|
||||
);
|
||||
collector.send(
|
||||
topic + "-changelog",
|
||||
"key",
|
||||
"value",
|
||||
new RecordHeaders(),
|
||||
0,
|
||||
0L,
|
||||
new StringSerializer(),
|
||||
new StringSerializer(),
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
assertFalse(collector.offsets().isEmpty());
|
||||
|
||||
if (clean) {
|
||||
collector.closeClean();
|
||||
} else {
|
||||
collector.closeDirty();
|
||||
}
|
||||
|
||||
assertTrue(collector.offsets().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
|
||||
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
|
||||
|
@ -1228,6 +1290,7 @@ public class RecordCollectorTest {
|
|||
|
||||
try (final LogCaptureAppender logCaptureAppender =
|
||||
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
|
||||
logCaptureAppender.setThreshold(Level.INFO);
|
||||
|
||||
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
|
||||
collector.flush();
|
||||
|
|
Loading…
Reference in New Issue