KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2023-12-21 10:15:04 +01:00 committed by GitHub
parent d59d613258
commit 19727f8d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 287 additions and 6 deletions

View File

@ -408,6 +408,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
</subpackage>
</subpackage>
</subpackage>

View File

@ -230,7 +230,7 @@
<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest).java"/>
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/>
<suppress checks="MethodLength"
files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest|StreamStreamJoinIntegrationTest).java"/>

View File

@ -361,7 +361,7 @@ public class RecordCollectorImpl implements RecordCollector {
// transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
// tasks had any data in the current transaction and therefore there is no need to commit or abort it.
checkForException();
close();
}
/**
@ -377,6 +377,11 @@ public class RecordCollectorImpl implements RecordCollector {
streamsProducer.abortTransaction();
}
close();
}
private void close() {
offsets.clear();
checkForException();
}

View File

@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@ -35,12 +38,18 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
@ -70,6 +79,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -82,14 +92,22 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
@ -789,6 +807,152 @@ public class EosIntegrationTest {
verifyOffsetsAreInCheckpoint(1);
}
@Test
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception {
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
}
@Test
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception {
if (!processingThreadsEnabled) {
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
}
}
@SuppressWarnings("deprecation")
private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean stateUpdaterEnabled) throws Exception {
if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
return;
}
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
final String stateStoreName = "stateStore";
purgeLocalStreamsState(streamsConfiguration);
final int startKey = 1;
final int endKey = 30001;
final List<KeyValue<Integer, Integer>> recordBatch1 = IntStream.range(startKey, endKey - 1000).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
recordBatch1,
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
IntegerSerializer.class,
IntegerSerializer.class),
CLUSTER.time);
final StoreBuilder<KeyValueStore<Integer, String>> stateStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.String()).withCachingEnabled();
final int partitionToVerify = 0;
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean throwException = new AtomicBoolean(false);
final TaskId task00 = new TaskId(0, partitionToVerify);
final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
final Topology topology = new Topology();
topology
.addSource("source", MULTI_PARTITION_INPUT_TOPIC)
.addProcessor("processor", () -> new Processor<Integer, String, Integer, String>() {
KeyValueStore<Integer, String> stateStore;
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context;
@Override
public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context) {
Processor.super.init(context);
this.context = context;
stateStore = context.getStateStore(stateStoreName);
}
@Override
public void process(final Record<Integer, String> record) {
context.recordMetadata().ifPresent(recordMetadata -> {
if (recordMetadata.partition() == partitionToVerify) {
if (throwException.compareAndSet(true, false)) {
throw new TaskCorruptedException(Collections.singleton(task00));
}
stateStore.put(record.key(), record.value());
} else {
stateStore.put(record.key(), record.value());
}
});
}
@Override
public void close() {
Processor.super.close();
}
}, "source")
.addStateStore(stateStore, "processor");
final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {}
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
if (topicPartition.partition() == 0) {
restoredOffsetsForPartition0.set(batchEndOffset);
if (batchEndOffset > 100) {
latch.countDown();
}
}
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {}
});
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60));
ensureCommittedRecordsInTopicPartition(
applicationId + "-" + stateStoreName + "-changelog",
partitionToVerify,
2000,
IntegerDeserializer.class,
IntegerDeserializer.class
);
throwException.set(true);
final List<KeyValue<Integer, Integer>> recordBatch2 = IntStream.range(endKey - 1000, endKey).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
recordBatch2,
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
IntegerSerializer.class,
IntegerSerializer.class),
CLUSTER.time);
latch.await();
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
final File checkpointFile = Paths.get(
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
task00.toString(),
".checkpoint"
).toFile();
assertTrue(checkpointFile.exists());
final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
assertEquals(
Long.valueOf(restoredOffsetsForPartition0.get()),
new ArrayList<>(checkpoints.values()).get(0)
);
}
private void verifyOffsetsAreInCheckpoint(final int partition) throws IOException {
final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
@ -989,13 +1153,21 @@ public class EosIntegrationTest {
private List<KeyValue<Long, Long>> readResult(final String topic,
final int numberOfRecords,
final String groupId) throws Exception {
return readResult(topic, numberOfRecords, LongDeserializer.class, LongDeserializer.class, groupId);
}
private <K, V> List<KeyValue<K, V>> readResult(final String topic,
final int numberOfRecords,
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer,
final String groupId) throws Exception {
if (groupId != null) {
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
groupId,
LongDeserializer.class,
LongDeserializer.class,
keyDeserializer,
valueDeserializer,
Utils.mkProperties(Collections.singletonMap(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
@ -1006,12 +1178,53 @@ public class EosIntegrationTest {
// read uncommitted
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), keyDeserializer, valueDeserializer),
topic,
numberOfRecords
);
}
private <K, V> void ensureCommittedRecordsInTopicPartition(final String topic,
final int partition,
final int numberOfRecords,
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer) throws Exception {
final long timeoutMs = 2 * DEFAULT_TIMEOUT;
final int maxTries = 10;
final long deadline = System.currentTimeMillis() + timeoutMs;
int tries = 0;
while (true) {
final List<ConsumerRecord<K, V>> consumerRecords = waitUntilMinRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
CONSUMER_GROUP_ID,
keyDeserializer,
valueDeserializer,
Utils.mkProperties(Collections.singletonMap(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
)
),
topic,
numberOfRecords,
timeoutMs
);
++tries;
if (consumerRecords.stream().anyMatch(record -> record.partition() == partition)) {
return;
}
if (tries >= maxTries) {
throw new AssertionError("No committed records in topic " + topic
+ ", partition " + partition + " after " + maxTries + " retries.");
}
final long now = System.currentTimeMillis();
if (now > deadline) {
throw new AssertionError("No committed records in topic " + topic
+ ", partition " + partition + " after " + timeoutMs + " ms.");
}
}
}
private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());

View File

@ -1313,7 +1313,6 @@ public class IntegrationTestUtils {
final int maxMessages) {
final List<ConsumerRecord<K, V>> consumerRecords;
consumer.subscribe(singletonList(topic));
System.out.println("Got assignment:" + consumer.assignment());
final int pollIntervalMs = 100;
consumerRecords = new ArrayList<>();
int totalPollTimeMs = 0;

View File

@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -90,6 +91,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -779,6 +781,66 @@ public class RecordCollectorTest {
collector.flush();
}
@Test
public void shouldClearOffsetsOnCloseClean() {
shouldClearOffsetsOnClose(true);
}
@Test
public void shouldClearOffsetsOnCloseDirty() {
shouldClearOffsetsOnClose(false);
}
private void shouldClearOffsetsOnClose(final boolean clean) {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
final long offset = 1234L;
final RecordMetadata metadata = new RecordMetadata(
new TopicPartition(topic, 0),
offset,
0,
0,
1,
1
);
when(streamsProducer.send(any(), any())).thenAnswer(invocation -> {
((Callback) invocation.getArgument(1)).onCompletion(metadata, null);
return null;
});
final ProcessorTopology topology = mock(ProcessorTopology.class);
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector.send(
topic + "-changelog",
"key",
"value",
new RecordHeaders(),
0,
0L,
new StringSerializer(),
new StringSerializer(),
null,
null
);
assertFalse(collector.offsets().isEmpty());
if (clean) {
collector.closeClean();
} else {
collector.closeDirty();
}
assertTrue(collector.offsets().isEmpty());
}
@Test
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
@ -1228,6 +1290,7 @@ public class RecordCollectorTest {
try (final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
logCaptureAppender.setThreshold(Level.INFO);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.flush();