KAFKA-12508: Disable KIP-557 (#10397)

A major issue has been raised that this implementation of
emit-on-change is vulnerable to a number of data-loss bugs
in the presence of recovery with dirty state under at-least-once
semantics. This should be fixed in the future when we implement
a way to avoid or clean up the dirty state under at-least-once,
at which point it will be safe to re-introduce KIP-557 and
complete it.

Reviewers: A. Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
John Roesler 2021-03-25 14:42:26 -05:00 committed by GitHub
parent e840b03a02
commit 9ef52dd2db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 46 deletions

View File

@ -21,20 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@ -79,11 +74,10 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
private MeteredTimestampedKeyValueStore<K, V> store;
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
private Sensor skippedIdempotentUpdatesSensor = null;
@SuppressWarnings("unchecked")
@Override
@ -92,24 +86,12 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
if (queryableName != null) {
final StateStore stateStore = context.getStateStore(queryableName);
try {
store = ((WrappedStateStore<MeteredTimestampedKeyValueStore<K, V>, K, V>) stateStore).wrapped();
} catch (final ClassCastException e) {
throw new IllegalStateException("Unexpected store type: " + stateStore.getClass() + " for store: " + queryableName, e);
}
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
skippedIdempotentUpdatesSensor = skippedIdempotentUpdatesSensor(
Thread.currentThread().getName(),
context.taskId().toString(),
((InternalProcessorContext) context).currentNode().name(),
metrics
);
}
}
@ -126,8 +108,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
if (queryableName != null) {
final RawAndDeserializedValue<V> tuple = store.getWithBinary(key);
final ValueAndTimestamp<V> oldValueAndTimestamp = tuple.value;
final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
final V oldValue;
if (oldValueAndTimestamp != null) {
oldValue = oldValueAndTimestamp.value();
@ -138,14 +119,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
} else {
oldValue = null;
}
final ValueAndTimestamp<V> newValueAndTimestamp = ValueAndTimestamp.make(value, context().timestamp());
final boolean isDifferentValue =
store.putIfDifferentValues(key, newValueAndTimestamp, tuple.serializedValue);
if (isDifferentValue) {
tupleForwarder.maybeForward(key, value, oldValue);
} else {
skippedIdempotentUpdatesSensor.record();
}
store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
tupleForwarder.maybeForward(key, value, oldValue);
} else {
context().forward(key, new Change<>(value, null));
}

View File

@ -49,8 +49,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import static java.util.Collections.singletonMap;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -384,16 +382,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
// it's not possible to know whether a result was previously emitted.
// HOWEVER, when the final join result is materialized (either explicitly or
// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null);
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(leftJoin || !(materialized || rejoin)
? mkMap(mkEntry("lhs1", null))
: emptyMap())
is(mkMap(mkEntry("lhs1", null)))
);
if (materialized) {
assertThat(
@ -470,15 +464,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or not (and thus whether any results have
// previously been emitted)
// previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or
// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
// The left join emits a _necessary_ update (since the lhs record has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
: (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
);
if (materialized) {
assertThat(
@ -490,9 +480,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
: (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
);
if (materialized) {
assertThat(

View File

@ -41,6 +41,7 @@ import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.time.Duration;
@ -91,6 +92,7 @@ public class KTableSourceTest {
supplier.theCapturedProcessor().processed());
}
@Ignore // we have disabled KIP-557 until KAFKA-12508 can be properly addressed
@Test
public void testKTableSourceEmitOnChange() {
final StreamsBuilder builder = new StreamsBuilder();