diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index f6756f58e1d..b9f3580234a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -21,20 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore; -import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue; -import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; -import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor; public class KTableSource implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class); @@ -79,11 +74,10 @@ public class KTableSource implements ProcessorSupplier { private class KTableSourceProcessor extends AbstractProcessor { - private MeteredTimestampedKeyValueStore store; + private TimestampedKeyValueStore store; private TimestampedTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; - private Sensor skippedIdempotentUpdatesSensor = null; @SuppressWarnings("unchecked") @Override @@ -92,24 +86,12 @@ public class KTableSource implements ProcessorSupplier { metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); if (queryableName != null) { - final StateStore stateStore = context.getStateStore(queryableName); - try { - store = ((WrappedStateStore, K, V>) stateStore).wrapped(); - } catch (final ClassCastException e) { - throw new IllegalStateException("Unexpected store type: " + stateStore.getClass() + " for store: " + queryableName, e); - } + store = (TimestampedKeyValueStore) context.getStateStore(queryableName); tupleForwarder = new TimestampedTupleForwarder<>( store, context, new TimestampedCacheFlushListener<>(context), sendOldValues); - skippedIdempotentUpdatesSensor = skippedIdempotentUpdatesSensor( - Thread.currentThread().getName(), - context.taskId().toString(), - ((InternalProcessorContext) context).currentNode().name(), - metrics - ); - } } @@ -126,8 +108,7 @@ public class KTableSource implements ProcessorSupplier { } if (queryableName != null) { - final RawAndDeserializedValue tuple = store.getWithBinary(key); - final ValueAndTimestamp oldValueAndTimestamp = tuple.value; + final ValueAndTimestamp oldValueAndTimestamp = store.get(key); final V oldValue; if (oldValueAndTimestamp != null) { oldValue = oldValueAndTimestamp.value(); @@ -138,14 +119,8 @@ public class KTableSource implements ProcessorSupplier { } else { oldValue = null; } - final ValueAndTimestamp newValueAndTimestamp = ValueAndTimestamp.make(value, context().timestamp()); - final boolean isDifferentValue = - store.putIfDifferentValues(key, newValueAndTimestamp, tuple.serializedValue); - if (isDifferentValue) { - tupleForwarder.maybeForward(key, value, oldValue); - } else { - skippedIdempotentUpdatesSensor.record(); - } + store.put(key, ValueAndTimestamp.make(value, context().timestamp())); + tupleForwarder.maybeForward(key, value, oldValue); } else { context().forward(key, new Change<>(value, null)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 22a34b83655..60104c4755b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -49,8 +49,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.Function; -import static java.util.Collections.singletonMap; - import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -384,16 +382,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. - // HOWEVER, when the final join result is materialized (either explicitly or - // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // For the left join, the tombstone is necessary. left.pipeInput("lhs1", (String) null); { assertThat( outputTopic.readKeyValuesToMap(), - is(leftJoin || !(materialized || rejoin) - ? mkMap(mkEntry("lhs1", null)) - : emptyMap()) + is(mkMap(mkEntry("lhs1", null))) ); if (materialized) { assertThat( @@ -470,15 +464,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join, // since it impossible to know whether the prior FK existed or not (and thus whether any results have // previously been emitted) - // previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or - // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // The left join emits a _necessary_ update (since the lhs record has actually changed) left.pipeInput("lhs1", "lhsValue1|rhs2"); assertThat( outputTopic.readKeyValuesToMap(), - is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) - : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) + is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) ); if (materialized) { assertThat( @@ -490,9 +480,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { left.pipeInput("lhs1", "lhsValue1|rhs3"); assertThat( outputTopic.readKeyValuesToMap(), - is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) - : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) + is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) ); if (materialized) { assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index e503403e7ba..02590f6063c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Ignore; import org.junit.Test; import java.time.Duration; @@ -91,6 +92,7 @@ public class KTableSourceTest { supplier.theCapturedProcessor().processed()); } + @Ignore // we have disabled KIP-557 until KAFKA-12508 can be properly addressed @Test public void testKTableSourceEmitOnChange() { final StreamsBuilder builder = new StreamsBuilder();