KAFKA-10543: Convert KTable joins to new PAPI (#11412)

* Migrate KTable joins to new Processor API.
* Migrate missing KTableProcessorSupplier implementations.
* Replace KTableProcessorSupplier with new Processor API implementation.

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Jorge Esteban Quilcate Otoya 2021-11-08 15:48:54 -05:00 committed by GitHub
parent c1bdfa125d
commit 807c5b4d28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 578 additions and 441 deletions

View File

@ -38,6 +38,7 @@ public interface DeserializationExceptionHandler extends Configurable {
* @param record record that failed deserialization
* @param exception the actual exception
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception);

View File

@ -236,7 +236,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
"-cogroup-merge",
builder,
CogroupedKStreamImpl.MERGE_NAME);
final KTableNewProcessorSupplier<K, VOut, K, VOut> passThrough = new KTablePassThrough<>(parentProcessors, storeName);
final KTableProcessorSupplier<K, VOut, K, VOut> passThrough = new KTablePassThrough<>(parentProcessors, storeName);
final ProcessorParameters<K, VOut, ?, ?> processorParameters = new ProcessorParameters(passThrough, mergeProcessorName);
final ProcessorGraphNode<K, VOut> mergeNode =
new ProcessorGraphNode<>(mergeProcessorName, processorParameters);

View File

@ -142,7 +142,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupp
private TimestampedKeyValueStore<KIn, VAgg> store;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -18,17 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K1, V1> {
class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> implements ProcessorSupplier<K1, V1, K1, VOut> {
private final KTableValueGetterSupplier<K2, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> mapper;
private final boolean leftJoin;
KStreamGlobalKTableJoin(final KTableValueGetterSupplier<K2, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> mapper,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
@ -38,7 +39,7 @@ class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements org.apache.kafka.str
}
@Override
public org.apache.kafka.streams.processor.Processor<K1, V1> get() {
public Processor<K1, V1, K1, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin);
}
}

View File

@ -1215,9 +1215,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KTableValueGetterSupplier<KG, VG> valueGetterSupplier =
((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, LEFTJOIN_NAME);
// Old PAPI. Needs to be migrated.
@SuppressWarnings("deprecation")
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
final ProcessorSupplier<K, V, K, VR> processorSupplier = new KStreamGlobalKTableJoin<>(
valueGetterSupplier,
joiner,
keySelector,
@ -1253,9 +1251,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
// Old PAPI. Needs to be migrated.
@SuppressWarnings("deprecation")
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
leftJoin);

View File

@ -18,17 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamKTableJoin<K, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V1> {
class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, R> joiner;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner;
private final boolean leftJoin;
KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, R> joiner,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
@ -36,7 +37,7 @@ class KStreamKTableJoin<K, R, V1, V2> implements org.apache.kafka.streams.proces
}
@Override
public org.apache.kafka.streams.processor.Processor<K, V1> get() {
public Processor<K, V1, K, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin);
}

View File

@ -19,6 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,19 +30,18 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends org.apache.kafka.streams.processor.AbstractProcessor<K1, V1> {
class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcessor<K1, V1, K1, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner,
final boolean leftJoin) {
this.valueGetter = valueGetter;
this.keyMapper = keyMapper;
@ -47,7 +50,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends org.apache.kafka.str
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K1, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
@ -55,7 +58,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends org.apache.kafka.str
}
@Override
public void process(final K1 key, final V1 value) {
public void process(final Record<K1, V1> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers
@ -64,17 +67,25 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends org.apache.kafka.str
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
final K2 mappedKey = keyMapper.apply(key, value);
if (mappedKey == null || value == null) {
LOG.warn(
"Skipping record due to null join key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(), context().offset()
);
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
if (mappedKey == null || record.value() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null join key or value. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null join key or value. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
} else {
final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(key, value, value2));
context().forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}
}
}

View File

@ -137,7 +137,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, K,
private TimestampedKeyValueStore<K, V> store;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -223,7 +223,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private SessionStore<KIn, VAgg> store;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -543,7 +543,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private TimestampedWindowStore<KIn, VAgg> windowStore;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
windowStore = context.getStateStore(storeName);
}

View File

@ -207,7 +207,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
private TimestampedWindowStore<KIn, VAgg> windowStore;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
windowStore = context.getStateStore(storeName);
}

View File

@ -27,7 +27,8 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class KTableAggregate<KIn, VIn, VAgg> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VAgg> {
public class KTableAggregate<KIn, VIn, VAgg> implements
KTableProcessorSupplier<KIn, VIn, KIn, VAgg> {
private final String storeName;
private final Initializer<VAgg> initializer;

View File

@ -25,7 +25,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> {
class KTableFilter<KIn, VIn> implements KTableProcessorSupplier<KIn, VIn, KIn, VIn> {
private final KTableImpl<KIn, ?, VIn> parent;
private final Predicate<? super KIn, ? super VIn> predicate;
private final boolean filterNot;
@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
// This is the old processor context for compatibility with the other KTable processors.
// Once we migrte them all, we can swap this out.
parentGetter.init(context);

View File

@ -198,7 +198,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
final KTableNewProcessorSupplier<K, V, K, V> processorSupplier =
final KTableProcessorSupplier<K, V, K, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
@ -311,7 +311,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
final KTableNewProcessorSupplier<K, V, K, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
// leaving in calls to ITB until building topology with graph
@ -464,7 +464,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableTransformValues<>(
this,
transformerSupplier,
queryableStoreName);
@ -726,8 +726,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
((KTableImpl<?, ?, ?>) other).enableSendingOldValues(true);
}
final KTableKTableAbstractJoin<K, VR, V, VO> joinThis;
final KTableKTableAbstractJoin<K, VR, VO, V> joinOther;
final KTableKTableAbstractJoin<K, V, VO, VR> joinThis;
final KTableKTableAbstractJoin<K, VO, V, VR> joinOther;
if (!leftOuter) { // inner
joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
@ -808,7 +808,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<>(grouped);
final String selectName = new NamedInternal(groupedInternal.name()).orElseGenerateWithPrefix(builder, SELECT_NAME);
final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
final KTableRepartitionMapSupplier<K, V, KeyValue<K1, V1>, K1, V1> selectSupplier = new KTableRepartitionMap<>(this, selector);
final ProcessorParameters<K, Change<V>, ?, ?> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
// select the aggregate key and values (old and new), it would require parent to send old values
@ -835,10 +835,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return new KTableSourceValueGetterSupplier<>(source.queryableName());
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, S, K, V>) processorSupplier).view();
} else if (processorSupplier instanceof KTableNewProcessorSupplier) {
return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
return ((KTableProcessorSupplier<?, ?, K, V>) processorSupplier).view();
}
}
@ -853,14 +851,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
} else if (processorSupplier instanceof KTableNewProcessorSupplier) {
final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
(KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
return false;
}
} else {
final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
} else if (processorSupplier instanceof KTableProcessorSupplier) {
final KTableProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
(KTableProcessorSupplier<?, ?, ?, ?>) processorSupplier;
if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
return false;
}

View File

@ -18,19 +18,20 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoiner;
abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> {
abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
KTableProcessorSupplier<K, V1, K, VOut> {
private final KTableImpl<K, ?, V1> table1;
private final KTableImpl<K, ?, V2> table2;
final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner;
boolean sendOldValues = false;
KTableKTableAbstractJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner) {
this.table1 = table1;
this.table2 = table2;
this.valueGetterSupplier1 = table1.valueGetterSupplier();

View File

@ -19,7 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@ -28,41 +32,40 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K, V1, V2, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
KTableKTableInnerJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner) {
super(table1, table2, joiner);
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
}
@Override
public KTableValueGetterSupplier<K, R> view() {
public KTableValueGetterSupplier<K, VOut> view() {
return new KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, VOut, V1, V2> {
KTableKTableInnerJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
public KTableValueGetter<K, VOut> get() {
return new KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private class KTableKTableJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -72,7 +75,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<VOut>> context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -83,38 +86,46 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void process(final K key, final Change<V1> change) {
public void process(final Record<K, Change<V1>> record) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
LOG.warn(
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
change, context().topic(), context().partition(), context().offset()
);
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
R newValue = null;
VOut newValue = null;
final long resultTimestamp;
R oldValue = null;
VOut oldValue = null;
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(key);
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
final V2 valueRight = getValueOrNull(valueAndTimestampRight);
if (valueRight == null) {
return;
}
resultTimestamp = Math.max(context().timestamp(), valueAndTimestampRight.timestamp());
resultTimestamp = Math.max(record.timestamp(), valueAndTimestampRight.timestamp());
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, valueRight);
if (record.value().newValue != null) {
newValue = joiner.apply(record.value().newValue, valueRight);
}
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, valueRight);
if (sendOldValues && record.value().oldValue != null) {
oldValue = joiner.apply(record.value().oldValue, valueRight);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp));
}
@Override
@ -123,7 +134,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R> {
private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, VOut> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@ -135,13 +146,13 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public ValueAndTimestamp<R> get(final K key) {
public ValueAndTimestamp<VOut> get(final K key) {
final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
final V1 value1 = getValueOrNull(valueAndTimestamp1);

View File

@ -16,6 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -23,16 +27,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, K, V> {
private final KTableProcessorSupplier<K, ?, V> parent1;
private final KTableProcessorSupplier<K, ?, V> parent2;
private final KTableProcessorSupplier<K, ?, K, V> parent1;
private final KTableProcessorSupplier<K, ?, K, V> parent2;
private final String queryableName;
private boolean sendOldValues = false;
KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, V> parent1,
final KTableProcessorSupplier<K, ?, V> parent2,
KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, K, V> parent1,
final KTableProcessorSupplier<K, ?, K, V> parent2,
final String queryableName) {
this.parent1 = parent1;
this.parent2 = parent2;
@ -44,7 +47,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableKTableJoinMergeProcessor();
}
@ -83,24 +86,24 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
return true;
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
final KTableProcessorSupplier<K, ?, V> parent2) {
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, K, V> parent1,
final KTableProcessorSupplier<K, ?, K, V> parent2) {
return of(parent1, parent2, null);
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
final KTableProcessorSupplier<K, ?, V> parent2,
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, K, V> parent1,
final KTableProcessorSupplier<K, ?, K, V> parent2,
final String queryableName) {
return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
}
private class KTableKTableJoinMergeProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private class KTableKTableJoinMergeProcessor extends ContextualProcessor<K, Change<V>, K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<V>> context) {
super.init(context);
if (queryableName != null) {
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
@ -113,15 +116,15 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
}
@Override
public void process(final K key, final Change<V> value) {
public void process(final Record<K, Change<V>> record) {
if (queryableName != null) {
store.put(key, ValueAndTimestamp.make(value.newValue, context().timestamp()));
tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null);
store.put(record.key(), ValueAndTimestamp.make(record.value().newValue, record.timestamp()));
tupleForwarder.maybeForward(record.key(), record.value().newValue, sendOldValues ? record.value().oldValue : null);
} else {
if (sendOldValues) {
context().forward(key, value);
context().forward(record);
} else {
context().forward(key, new Change<>(value.newValue, null));
context().forward(record.withValue(new Change<>(record.value().newValue, null)));
}
}
}

View File

@ -18,7 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@ -28,40 +32,39 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K, V1, V2, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableLeftJoin.class);
KTableKTableLeftJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner) {
super(table1, table2, joiner);
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
}
@Override
public KTableValueGetterSupplier<K, R> view() {
public KTableValueGetterSupplier<K, VOut> view() {
return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, VOut, V1, V2> {
KTableKTableLeftJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
public KTableValueGetter<K, VOut> get() {
return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableLeftJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private class KTableKTableLeftJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -71,7 +74,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<VOut>> context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -82,27 +85,35 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void process(final K key, final Change<V1> change) {
public void process(final Record<K, Change<V1>> record) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
LOG.warn(
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
change, context().topic(), context().partition(), context().offset()
);
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
R newValue = null;
VOut newValue = null;
final long resultTimestamp;
R oldValue = null;
VOut oldValue = null;
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(key);
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
final V2 value2 = getValueOrNull(valueAndTimestampRight);
final long timestampRight;
if (value2 == null) {
if (change.newValue == null && change.oldValue == null) {
if (record.value().newValue == null && record.value().oldValue == null) {
return;
}
timestampRight = UNKNOWN;
@ -110,17 +121,17 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
timestampRight = valueAndTimestampRight.timestamp();
}
resultTimestamp = Math.max(context().timestamp(), timestampRight);
resultTimestamp = Math.max(record.timestamp(), timestampRight);
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
if (record.value().newValue != null) {
newValue = joiner.apply(record.value().newValue, value2);
}
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, value2);
if (sendOldValues && record.value().oldValue != null) {
oldValue = joiner.apply(record.value().oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp));
}
@Override
@ -129,7 +140,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, VOut> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@ -141,13 +152,13 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public ValueAndTimestamp<R> get(final K key) {
public ValueAndTimestamp<VOut> get(final K key) {
final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
final V1 value1 = getValueOrNull(valueAndTimestamp1);

View File

@ -18,7 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@ -28,39 +32,38 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K, V1, V2, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableOuterJoin.class);
KTableKTableOuterJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner) {
super(table1, table2, joiner);
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
}
@Override
public KTableValueGetterSupplier<K, R> view() {
public KTableValueGetterSupplier<K, VOut> view() {
return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, VOut, V1, V2> {
KTableKTableOuterJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
public KTableValueGetter<K, VOut> get() {
return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableOuterJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private class KTableKTableOuterJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -70,7 +73,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<VOut>> context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -81,41 +84,49 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void process(final K key, final Change<V1> change) {
public void process(final Record<K, Change<V1>> record) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
LOG.warn(
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
change, context().topic(), context().partition(), context().offset()
);
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
R newValue = null;
VOut newValue = null;
final long resultTimestamp;
R oldValue = null;
VOut oldValue = null;
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.get(key);
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.get(record.key());
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 == null) {
if (change.newValue == null && change.oldValue == null) {
if (record.value().newValue == null && record.value().oldValue == null) {
return;
}
resultTimestamp = context().timestamp();
resultTimestamp = record.timestamp();
} else {
resultTimestamp = Math.max(context().timestamp(), valueAndTimestamp2.timestamp());
resultTimestamp = Math.max(record.timestamp(), valueAndTimestamp2.timestamp());
}
if (value2 != null || change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
if (value2 != null || record.value().newValue != null) {
newValue = joiner.apply(record.value().newValue, value2);
}
if (sendOldValues && (value2 != null || change.oldValue != null)) {
oldValue = joiner.apply(change.oldValue, value2);
if (sendOldValues && (value2 != null || record.value().oldValue != null)) {
oldValue = joiner.apply(record.value().oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp));
}
@Override
@ -124,7 +135,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, VOut> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@ -136,14 +147,14 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public ValueAndTimestamp<R> get(final K key) {
R newValue = null;
public ValueAndTimestamp<VOut> get(final K key) {
VOut newValue = null;
final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
final V1 value1;

View File

@ -18,7 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@ -27,39 +31,38 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K, V1, V2, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableRightJoin.class);
KTableKTableRightJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner) {
super(table1, table2, joiner);
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableRightJoinProcessor(valueGetterSupplier2.get());
}
@Override
public KTableValueGetterSupplier<K, R> view() {
public KTableValueGetterSupplier<K, VOut> view() {
return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, VOut, V1, V2> {
KTableKTableRightJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
public KTableValueGetter<K, VOut> get() {
return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableRightJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private class KTableKTableRightJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -69,7 +72,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<VOut>> context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -80,38 +83,46 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void process(final K key, final Change<V1> change) {
public void process(final Record<K, Change<V1>> record) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
LOG.warn(
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
change, context().topic(), context().partition(), context().offset()
);
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
final R newValue;
final VOut newValue;
final long resultTimestamp;
R oldValue = null;
VOut oldValue = null;
final ValueAndTimestamp<V2> valueAndTimestampLeft = valueGetter.get(key);
final ValueAndTimestamp<V2> valueAndTimestampLeft = valueGetter.get(record.key());
final V2 valueLeft = getValueOrNull(valueAndTimestampLeft);
if (valueLeft == null) {
return;
}
resultTimestamp = Math.max(context().timestamp(), valueAndTimestampLeft.timestamp());
resultTimestamp = Math.max(record.timestamp(), valueAndTimestampLeft.timestamp());
// joiner == "reverse joiner"
newValue = joiner.apply(change.newValue, valueLeft);
newValue = joiner.apply(record.value().newValue, valueLeft);
if (sendOldValues) {
// joiner == "reverse joiner"
oldValue = joiner.apply(change.oldValue, valueLeft);
oldValue = joiner.apply(record.value().oldValue, valueLeft);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp));
}
@Override
@ -120,7 +131,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> {
private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, VOut> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@ -132,13 +143,13 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public ValueAndTimestamp<R> get(final K key) {
public ValueAndTimestamp<VOut> get(final K key) {
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(key);
final V2 value2 = getValueOrNull(valueAndTimestamp2);

View File

@ -26,7 +26,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class KTableMapValues<KIn, VIn, VOut> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VOut> {
class KTableMapValues<KIn, VIn, VOut> implements KTableProcessorSupplier<KIn, VIn, KIn, VOut> {
private final KTableImpl<KIn, ?, VIn> parent;
private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut> mapper;
private final String queryableName;
@ -155,7 +155,7 @@ class KTableMapValues<KIn, VIn, VOut> implements KTableNewProcessorSupplier<KIn,
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
parentGetter.init(context);
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -40,7 +40,7 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
private TimestampedKeyValueStore<K, V> store;
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Collection;
public class KTablePassThrough<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> {
public class KTablePassThrough<KIn, VIn> implements KTableProcessorSupplier<KIn, VIn, KIn, VIn> {
private final Collection<KStreamAggProcessorSupplier> parents;
private final String storeName;
@ -82,7 +82,7 @@ public class KTablePassThrough<KIn, VIn> implements KTableNewProcessorSupplier<K
private TimestampedKeyValueStore<KIn, VIn> store;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -16,10 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public interface KTableProcessorSupplier<K, V, T> extends org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> {
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
KTableValueGetterSupplier<K, T> view();
public interface KTableProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {
KTableValueGetterSupplier<KOut, VOut> view();
/**
* Potentially enables sending old values.
@ -32,7 +33,7 @@ public interface KTableProcessorSupplier<K, V, T> extends org.apache.kafka.strea
*
* @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old
* values.
* @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was
* @return {@code true} if sending old values is enabled, i.e. either because {@code forceMaterialization} was
* {@code true} or some upstream node is materialized.
*/
boolean enableSendingOldValues(boolean forceMaterialization);

View File

@ -26,7 +26,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class KTableReduce<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V> {
private final String storeName;
private final Reducer<V> addReducer;

View File

@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@ -28,8 +33,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
* <p>
* Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapSupplier<K, V, KeyValue<K1, V1>, K1, V1> {
private final KTableImpl<K, ?, V> parent;
private final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper;
@ -40,7 +44,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
public Processor<K, Change<V>, K1, Change<V1>> get() {
return new KTableMapProcessor();
}
@ -70,30 +74,32 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
}
private class KTableMapProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private class KTableMapProcessor extends ContextualProcessor<K, Change<V>, K1, Change<V1>> {
/**
* @throws StreamsException if key is null
*/
@Override
public void process(final K key, final Change<V> change) {
public void process(final Record<K, Change<V>> record) {
// the original key should never be null
if (key == null) {
if (record.key() == null) {
throw new StreamsException("Record key for the grouping KTable should not be null.");
}
// if the value is null, we do not need to forward its selected key-value further
final KeyValue<? extends K1, ? extends V1> newPair = change.newValue == null ? null : mapper.apply(key, change.newValue);
final KeyValue<? extends K1, ? extends V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
final KeyValue<? extends K1, ? extends V1> newPair = record.value().newValue == null ? null :
mapper.apply(record.key(), record.value().newValue);
final KeyValue<? extends K1, ? extends V1> oldPair = record.value().oldValue == null ? null :
mapper.apply(record.key(), record.value().oldValue);
// if the selected repartition key or value is null, skip
// forward oldPair first, to be consistent with reduce and aggregate
if (oldPair != null && oldPair.key != null && oldPair.value != null) {
context().forward(oldPair.key, new Change<>(null, oldPair.value));
context().forward(record.withKey(oldPair.key).withValue(new Change<>(null, oldPair.value)));
}
if (newPair != null && newPair.key != null && newPair.value != null) {
context().forward(newPair.key, new Change<>(newPair.value, null));
context().forward(record.withKey(newPair.key).withValue(new Change<>(newPair.value, null)));
}
}
@ -101,15 +107,15 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
private final KTableValueGetter<K, V> parentGetter;
private org.apache.kafka.streams.processor.ProcessorContext context;
private InternalProcessorContext<?, ?> context;
KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
this.context = context;
public void init(final ProcessorContext<?, ?> context) {
this.context = (InternalProcessorContext<?, ?>) context;
parentGetter.init(context);
}
@ -118,7 +124,8 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp());
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
);
}
@Override

View File

@ -18,9 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {
public interface KTableRepartitionMapSupplier<KIn, VIn, VView, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {
KTableValueGetterSupplier<KOut, VOut> view();
KTableValueGetterSupplier<KIn, VView> view();
/**
* Potentially enables sending old values.

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -40,7 +40,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
private TimestampedKeyValueStore<K, V> store = null;
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
store = context.getStateStore(storeName);
}

View File

@ -19,6 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@ -30,15 +34,14 @@ import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
class KTableTransformValues<K, V, VOut> implements KTableProcessorSupplier<K, V, K, VOut> {
private final KTableImpl<K, ?, V> parent;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VOut> transformerSupplier;
private final String queryableName;
private boolean sendOldValues = false;
KTableTransformValues(final KTableImpl<K, ?, V> parent,
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier,
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VOut> transformerSupplier,
final String queryableName) {
this.parent = Objects.requireNonNull(parent, "parent");
this.transformerSupplier = Objects.requireNonNull(transformerSupplier, "transformerSupplier");
@ -46,20 +49,20 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
public Processor<K, Change<V>, K, Change<VOut>> get() {
return new KTableTransformValuesProcessor(transformerSupplier.get());
}
@Override
public KTableValueGetterSupplier<K, V1> view() {
public KTableValueGetterSupplier<K, VOut> view() {
if (queryableName != null) {
return new KTableMaterializedValueGetterSupplier<>(queryableName);
}
return new KTableValueGetterSupplier<K, V1>() {
return new KTableValueGetterSupplier<K, VOut>() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
public KTableValueGetter<K, V1> get() {
public KTableValueGetter<K, VOut> get() {
return new KTableTransformValuesGetter(
parentValueGetterSupplier.get(),
transformerSupplier.get());
@ -85,19 +88,20 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
return sendOldValues;
}
private class KTableTransformValuesProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private TimestampedKeyValueStore<K, V1> store;
private TimestampedTupleForwarder<K, V1> tupleForwarder;
private class KTableTransformValuesProcessor extends ContextualProcessor<K, Change<V>, K, Change<VOut>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends VOut> valueTransformer;
private TimestampedKeyValueStore<K, VOut> store;
private TimestampedTupleForwarder<K, VOut> tupleForwarder;
private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends VOut> valueTransformer) {
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, Change<VOut>> context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
final InternalProcessorContext<K, Change<VOut>> internalProcessorContext = (InternalProcessorContext<K, Change<VOut>>) context;
valueTransformer.init(new ForwardingDisabledProcessorContext(internalProcessorContext));
if (queryableName != null) {
store = context.getStateStore(queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
@ -109,16 +113,16 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
public void process(final K key, final Change<V> change) {
final V1 newValue = valueTransformer.transform(key, change.newValue);
public void process(final Record<K, Change<V>> record) {
final VOut newValue = valueTransformer.transform(record.key(), record.value().newValue);
if (queryableName == null) {
final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null;
context().forward(key, new Change<>(newValue, oldValue));
final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null;
context().forward(record.withValue(new Change<>(newValue, oldValue)));
} else {
final V1 oldValue = sendOldValues ? getValueOrNull(store.get(key)) : null;
store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
tupleForwarder.maybeForward(key, newValue, oldValue);
final VOut oldValue = sendOldValues ? getValueOrNull(store.get(record.key())) : null;
store.put(record.key(), ValueAndTimestamp.make(newValue, record.timestamp()));
tupleForwarder.maybeForward(record.key(), newValue, oldValue);
}
}
@ -129,26 +133,26 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
private class KTableTransformValuesGetter implements KTableValueGetter<K, VOut> {
private final KTableValueGetter<K, V> parentGetter;
private InternalProcessorContext internalProcessorContext;
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private final ValueTransformerWithKey<? super K, ? super V, ? extends VOut> valueTransformer;
KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
final ValueTransformerWithKey<? super K, ? super V, ? extends VOut> valueTransformer) {
this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
internalProcessorContext = (InternalProcessorContext) context;
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
valueTransformer.init(new ForwardingDisabledProcessorContext(internalProcessorContext));
}
@Override
public ValueAndTimestamp<V1> get(final K key) {
public ValueAndTimestamp<VOut> get(final K key) {
final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
final ProcessorRecordContext currentContext = internalProcessorContext.recordContext();
@ -165,7 +169,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
new RecordHeaders()
));
final ValueAndTimestamp<V1> result = ValueAndTimestamp.make(
final ValueAndTimestamp<VOut> result = ValueAndTimestamp.make(
valueTransformer.transform(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp());

View File

@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
public interface KTableValueGetter<K, V> {
void init(ProcessorContext context);
void init(ProcessorContext<?, ?> context);
ValueAndTimestamp<V> get(K key);

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
@ -51,7 +51,7 @@ public class CombinedKeySchema<KO, K> {
}
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;

View File

@ -21,6 +21,12 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -32,8 +38,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.apache.kafka.streams.processor.ProcessorSupplier<KO, Change<VO>> {
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;
@ -47,17 +53,17 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.
}
@Override
public org.apache.kafka.streams.processor.Processor<KO, Change<VO>> get() {
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
return new KTableKTableJoinProcessor();
}
private final class KTableKTableJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<KO, Change<VO>> {
private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private Sensor droppedRecordsSensor;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
super.init(context);
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
@ -69,19 +75,27 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.
}
@Override
public void process(final KO key, final Change<VO> value) {
// if the key is null, we do not need proceed aggregating
public void process(final Record<KO, Change<VO>> record) {
// if the key is null, we do not need to proceed aggregating
// the record with the table
if (key == null) {
LOG.warn(
"Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
value, context().topic(), context().partition(), context().offset()
);
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
final Bytes prefixBytes = keySchema.prefixBytes(key);
final Bytes prefixBytes = keySchema.prefixBytes(record.key());
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
@ -93,8 +107,8 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.
if (prefixEquals(next.key.get(), prefixBytes.get())) {
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(next.value.value().getHash(), value.newValue)
record.withKey(combinedKey.getPrimaryKey())
.withValue(new SubscriptionResponseWrapper<>(next.value.value().getHash(), record.value().newValue))
);
}
}

View File

@ -21,6 +21,12 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
@ -36,8 +42,7 @@ import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Subscrip
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> {
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
private final Function<V, KO> foreignKeyExtractor;
@ -62,11 +67,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements o
}
@Override
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
return new UnbindChangeProcessor();
}
private class UnbindChangeProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
private Sensor droppedRecordsSensor;
private String foreignKeySerdeTopic;
@ -74,7 +79,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements o
@SuppressWarnings("unchecked")
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<KO, SubscriptionWrapper<K>> context) {
super.init(context);
foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get();
valueSerdeTopic = valueSerdeTopicSupplier.get();
@ -93,28 +98,44 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements o
}
@Override
public void process(final K key, final Change<V> change) {
final long[] currentHash = change.newValue == null ?
public void process(final Record<K, Change<V>> record) {
final long[] currentHash = record.value().newValue == null ?
null :
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, change.newValue));
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue));
if (change.oldValue != null) {
final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue);
if (record.value().oldValue != null) {
final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
if (oldForeignKey == null) {
LOG.warn(
"Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
change.oldValue, context().topic(), context().partition(), context().offset()
);
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
if (change.newValue != null) {
final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
if (record.value().newValue != null) {
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) {
LOG.warn(
"Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
change.newValue, context().topic(), context().partition(), context().offset()
);
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
@ -126,17 +147,23 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements o
if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, key));
context().forward(
record.withKey(oldForeignKey)
.withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, record.key())));
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
}
context().forward(newForeignKey, new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, key));
context().forward(
record.withKey(newForeignKey)
.withValue(new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, record.key())));
} else {
//A simple propagatable delete. Delete from the state store and propagate the delete onwards.
context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, key));
context().forward(
record.withKey(oldForeignKey)
.withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, record.key())));
}
} else if (change.newValue != null) {
} else if (record.value().newValue != null) {
//change.oldValue is null, which means it was deleted at least once before, or it is brand new.
//In either case, we only need to propagate if the FK_VAL is available, as the null from the delete would
//have been propagated otherwise.
@ -148,15 +175,25 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements o
} else {
instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
}
final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) {
LOG.warn(
"Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
change.newValue, context().topic(), context().partition(), context().offset()
);
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
} else {
context().forward(newForeignKey, new SubscriptionWrapper<>(currentHash, instruction, key));
context().forward(
record.withKey(newForeignKey)
.withValue(new SubscriptionWrapper<>(currentHash, instruction, record.key())));
}
}
}

View File

@ -21,7 +21,11 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
@ -35,9 +39,8 @@ import java.util.Objects;
* @param <KO> Type of foreign key
* @param <VO> Type of foreign value
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
implements org.apache.kafka.streams.processor.ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;
@ -46,24 +49,24 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
}
@Override
public org.apache.kafka.streams.processor.Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> get() {
return new org.apache.kafka.streams.processor.AbstractProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
return new ContextualProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>>() {
private KTableValueGetter<KO, VO> foreignValues;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
super.init(context);
foreignValues = foreignValueGetterSupplier.get();
foreignValues.init(context);
}
@Override
public void process(final CombinedKey<KO, K> combinedKey, final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change) {
Objects.requireNonNull(combinedKey, "This processor should never see a null key.");
Objects.requireNonNull(change, "This processor should never see a null value.");
final ValueAndTimestamp<SubscriptionWrapper<K>> valueAndTimestamp = change.newValue;
public void process(final Record<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> record) {
Objects.requireNonNull(record.key(), "This processor should never see a null key.");
Objects.requireNonNull(record.value(), "This processor should never see a null value.");
final ValueAndTimestamp<SubscriptionWrapper<K>> valueAndTimestamp = record.value().newValue;
Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
final SubscriptionWrapper<K> value = valueAndTimestamp.value();
@ -74,7 +77,7 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
final ValueAndTimestamp<VO> foreignValueAndTime = foreignValues.get(combinedKey.getForeignKey());
final ValueAndTimestamp<VO> foreignValueAndTime = foreignValues.get(record.key().getForeignKey());
final long resultTimestamp =
foreignValueAndTime == null ?
@ -84,9 +87,9 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
switch (value.getInstruction()) {
case DELETE_KEY_AND_PROPAGATE:
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<VO>(value.getHash(), null),
To.all().withTimestamp(resultTimestamp)
record.withKey(record.key().getPrimaryKey())
.withValue(new SubscriptionResponseWrapper<VO>(value.getHash(), null))
.withTimestamp(resultTimestamp)
);
break;
case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE:
@ -96,17 +99,17 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
final VO valueToSend = foreignValueAndTime == null ? null : foreignValueAndTime.value();
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(value.getHash(), valueToSend),
To.all().withTimestamp(resultTimestamp)
record.withKey(record.key().getPrimaryKey())
.withValue(new SubscriptionResponseWrapper<>(value.getHash(), valueToSend))
.withTimestamp(resultTimestamp)
);
break;
case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
if (foreignValueAndTime != null) {
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()),
To.all().withTimestamp(resultTimestamp)
record.withKey(record.key().getPrimaryKey())
.withValue(new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()))
.withTimestamp(resultTimestamp)
);
}
break;

View File

@ -22,6 +22,11 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
@ -37,8 +42,7 @@ import java.util.function.Supplier;
* @param <VO> Type of foreign values
* @param <VR> Type of joined result of primary and foreign values
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
@ -58,8 +62,8 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
}
@Override
public org.apache.kafka.streams.processor.Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new org.apache.kafka.streams.processor.AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
public Processor<K, SubscriptionResponseWrapper<VO>, K, VR> get() {
return new ContextualProcessor<K, SubscriptionResponseWrapper<VO>, K, VR>() {
private String valueHashSerdePseudoTopic;
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
@ -67,7 +71,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@SuppressWarnings("unchecked")
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<K, VR> context) {
super.init(context);
valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
valueGetter = valueGetterSupplier.get();
@ -78,31 +82,31 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
}
@Override
public void process(final K key, final SubscriptionResponseWrapper<VO> value) {
if (value.getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) {
public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
if (record.value().getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
//compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
//upgrading from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version.");
}
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(key);
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(record.key());
final long[] currentHash = currentValueWithTimestamp == null ?
null :
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
final long[] messageHash = value.getOriginalValueHash();
final long[] messageHash = record.value().getOriginalValueHash();
//If this value doesn't match the current value from the original table, it is stale and should be discarded.
if (java.util.Arrays.equals(messageHash, currentHash)) {
final VR result;
if (value.getForeignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
if (record.value().getForeignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
result = null; //Emit tombstone
} else {
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), value.getForeignValue());
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), record.value().getForeignValue());
}
context().forward(key, result);
context().forward(record.withValue(result));
}
}
};

View File

@ -21,7 +21,12 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
@ -30,9 +35,8 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
implements org.apache.kafka.streams.processor.ProcessorSupplier<KO, SubscriptionWrapper<K>> {
implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
@ -47,15 +51,15 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
}
@Override
public org.apache.kafka.streams.processor.Processor<KO, SubscriptionWrapper<K>> get() {
public Processor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
return new org.apache.kafka.streams.processor.AbstractProcessor<KO, SubscriptionWrapper<K>>() {
return new ContextualProcessor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
private Sensor droppedRecordsSensor;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> context) {
super.init(context);
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
@ -70,30 +74,38 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
}
@Override
public void process(final KO key, final SubscriptionWrapper<K> value) {
if (key == null) {
LOG.warn(
"Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
value, context().topic(), context().partition(), context().offset()
);
public void process(final Record<KO, SubscriptionWrapper<K>> record) {
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return;
}
if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
if (record.value().getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
final Bytes subscriptionKey = keySchema.toBytes(key, value.getPrimaryKey());
final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().getPrimaryKey());
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(record.value(), record.timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
//This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
if (record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);
@ -102,9 +114,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
// note: key is non-nullable
// note: newValue is non-nullable
context().forward(
new CombinedKey<>(key, value.getPrimaryKey()),
change,
To.all().withTimestamp(newValue.timestamp())
record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey()))
.withValue(change)
.withTimestamp(newValue.timestamp())
);
}
};

View File

@ -69,14 +69,14 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
}
@SuppressWarnings("unchecked")
<VR> KTableProcessorSupplier<KIn, VIn, VR> kTableProcessorSupplier() {
<KR, VR> KTableProcessorSupplier<KIn, VIn, KR, VR> kTableProcessorSupplier() {
// This cast always works because KTableProcessorSupplier hasn't been converted yet.
return (KTableProcessorSupplier<KIn, VIn, VR>) oldProcessorSupplier;
return (KTableProcessorSupplier<KIn, VIn, KR, VR>) processorSupplier;
}
@SuppressWarnings("unchecked")
KTableKTableJoinMerger<KIn, VIn> kTableKTableJoinMergerProcessorSupplier() {
return (KTableKTableJoinMerger<KIn, VIn>) oldProcessorSupplier;
return (KTableKTableJoinMerger<KIn, VIn>) processorSupplier;
}
public String processorName() {

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableNewProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
@ -39,7 +39,8 @@ import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import static java.util.Objects.requireNonNull;
public class KTableSuppressProcessorSupplier<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
public class KTableSuppressProcessorSupplier<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
private final SuppressedInternal<K> suppress;
private final String storeName;
private final KTableImpl<K, ?, V> parentKTable;
@ -71,7 +72,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableNewProcessor
private TimeOrderedKeyValueBuffer<K, V> buffer;
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
parentGetter.init(context);
// the main processor is responsible for the buffer's lifecycle
buffer = requireNonNull(context.getStateStore(storeName));

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
@ -42,8 +42,9 @@ public class TopologyTestDriverWrapper extends TopologyTestDriver {
* @param processorName processor name to set as current node
* @return the processor context
*/
public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) {
final ProcessorContext context = task.processorContext();
@SuppressWarnings("unchecked")
public <K, V> ProcessorContext<K, V> setCurrentNodeForProcessorContext(final String processorName) {
final ProcessorContext<K, V> context = task.processorContext();
((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName));
return context;
}

View File

@ -236,7 +236,7 @@ public class KStreamKTableJoinTest {
assertThat(
appender.getMessages(),
hasItem("Skipping record due to null join key or value. key=[null] value=[A] topic=[streamTopic] partition=[0] "
hasItem("Skipping record due to null join key or value. topic=[streamTopic] partition=[0] "
+ "offset=[0]"));
}
}
@ -250,7 +250,7 @@ public class KStreamKTableJoinTest {
assertThat(
appender.getMessages(),
hasItem("Skipping record due to null join key or value. key=[1] value=[null] topic=[streamTopic] partition=[0] "
hasItem("Skipping record due to null join key or value. topic=[streamTopic] partition=[0] "
+ "offset=[0]")
);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -28,7 +27,9 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.TestRecord;
@ -53,7 +54,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableInnerJoinTest {
private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
@ -251,22 +251,22 @@ public class KTableKTableInnerJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
final Processor<String, Change<String>, String, Change<Object>> join = new KTableKTableInnerJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null
).get();
final MockProcessorContext context = new MockProcessorContext(props);
context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
final MockProcessorContext<String, Change<Object>> context = new MockProcessorContext<>(props);
context.setRecordMetadata("left", -1, -2);
join.init(context);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableInnerJoin.class)) {
join.process(null, new Change<>("new", "old"));
join.process(new Record<>(null, new Change<>("new", "old"), 0));
assertThat(
appender.getMessages(),
hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")
hasItem("Skipping record due to null key. topic=[left] partition=[-1] offset=[-2]")
);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -32,7 +31,9 @@ import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
@ -61,7 +62,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableLeftJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
@ -517,22 +517,22 @@ public class KTableKTableLeftJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableLeftJoin<>(
final Processor<String, Change<String>, String, Change<Object>> join = new KTableKTableLeftJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null
).get();
final MockProcessorContext context = new MockProcessorContext(props);
context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
final MockProcessorContext<String, Change<Object>> context = new MockProcessorContext<>(props);
context.setRecordMetadata("left", -1, -2);
join.init(context);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableLeftJoin.class)) {
join.process(null, new Change<>("new", "old"));
join.process(new Record<>(null, new Change<>("new", "old"), 0));
assertThat(
appender.getMessages(),
hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")
hasItem("Skipping record due to null key. topic=[left] partition=[-1] offset=[-2]")
);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@ -27,7 +26,9 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
@ -51,7 +52,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableOuterJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
@ -408,22 +408,22 @@ public class KTableKTableOuterJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
final Processor<String, Change<String>, String, Change<Object>> join = new KTableKTableOuterJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null
).get();
final MockProcessorContext context = new MockProcessorContext(props);
context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
final MockProcessorContext<String, Change<Object>> context = new MockProcessorContext<>(props);
context.setRecordMetadata("left", -1, -2);
join.init(context);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableOuterJoin.class)) {
join.process(null, new Change<>("new", "old"));
join.process(new Record<>(null, new Change<>("new", "old"), 0));
assertThat(
appender.getMessages(),
hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")
hasItem("Skipping record due to null key. topic=[left] partition=[-1] offset=[-2]")
);
}
}

View File

@ -16,12 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
import org.apache.kafka.test.StreamsTestUtils;
@ -33,7 +34,6 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableRightJoinTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@ -43,26 +43,26 @@ public class KTableKTableRightJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableRightJoin<>(
final Processor<String, Change<String>, String, Change<Object>> join = new KTableKTableRightJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null
).get();
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
final MockProcessorContext context = new MockProcessorContext(props);
context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
final MockProcessorContext<String, Change<Object>> context = new MockProcessorContext<>(props);
context.setRecordMetadata("left", -1, -2);
join.init(context);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableRightJoin.class)) {
join.process(null, new Change<>("new", "old"));
join.process(new Record<>(null, new Change<>("new", "old"), 0));
assertThat(
appender.getEvents().stream()
.filter(e -> e.getLevel().equals("WARN"))
.map(Event::getMessage)
.collect(Collectors.toList()),
hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")
hasItem("Skipping record due to null key. topic=[left] partition=[-1] offset=[-2]")
);
}
}

View File

@ -34,6 +34,8 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@ -87,7 +89,7 @@ public class KTableTransformValuesTest {
@Mock(MockType.NICE)
private KTableImpl<String, String, String> parent;
@Mock(MockType.NICE)
private InternalProcessorContext context;
private InternalProcessorContext<String, Change<String>> context;
@Mock(MockType.NICE)
private KTableValueGetterSupplier<String, String> parentGetterSupplier;
@Mock(MockType.NICE)
@ -145,7 +147,7 @@ public class KTableTransformValuesTest {
final NoOpValueTransformerWithKeySupplier<String, String> transformer = new NoOpValueTransformerWithKeySupplier<>();
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, transformer, null);
final org.apache.kafka.streams.processor.Processor<String, Change<String>> processor = transformValues.get();
final Processor<String, Change<String>, String, Change<String>> processor = transformValues.get();
processor.init(context);
@ -157,14 +159,14 @@ public class KTableTransformValuesTest {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
final org.apache.kafka.streams.processor.Processor<String, Change<String>> processor = transformValues.get();
final Processor<String, Change<String>, String, Change<String>> processor = transformValues.get();
processor.init(context);
context.forward("Key", new Change<>("Key->newValue!", null));
context.forward(new Record<>("Key", new Change<>("Key->newValue!", null), 0));
expectLastCall();
replay(context);
processor.process("Key", new Change<>("newValue", "oldValue"));
processor.process(new Record<>("Key", new Change<>("newValue", "oldValue"), 0));
verify(context);
}
@ -178,14 +180,14 @@ public class KTableTransformValuesTest {
replay(parent);
transformValues.enableSendingOldValues(true);
final org.apache.kafka.streams.processor.Processor<String, Change<String>> processor = transformValues.get();
final Processor<String, Change<String>, String, Change<String>> processor = transformValues.get();
processor.init(context);
context.forward("Key", new Change<>("Key->newValue!", "Key->oldValue!"));
context.forward(new Record<>("Key", new Change<>("Key->newValue!", "Key->oldValue!"), 0));
expectLastCall();
replay(context);
processor.process("Key", new Change<>("newValue", "oldValue"));
processor.process(new Record<>("Key", new Change<>("newValue", "oldValue"), 0));
verify(context);
}
@ -301,7 +303,7 @@ public class KTableTransformValuesTest {
expectLastCall();
replay(mockSupplier, transformer);
final org.apache.kafka.streams.processor.Processor<String, Change<String>> processor = transformValues.get();
final Processor<String, Change<String>, String, Change<String>> processor = transformValues.get();
processor.close();
verify(transformer);

View File

@ -17,14 +17,14 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.junit.Test;
@ -37,7 +37,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionResolverJoinProcessorSupplierTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final ValueJoiner<String, String, String> JOINER =
@ -50,7 +49,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
public KTableValueGetter<K, V> get() {
return new KTableValueGetter<K, V>() {
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<?, ?> context) {
}
@Override
@ -83,15 +82,15 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final org.apache.kafka.streams.processor.api.MockProcessorContext<String, String> context = new org.apache.kafka.streams.processor.api.MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", "lhsValue");
final long[] oldHash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
processor.process("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue"));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue"), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded, empty());
}
@ -108,15 +107,15 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", null);
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded, empty());
}
@ -133,17 +132,17 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", "lhsValue");
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", "(lhsValue,rhsValue)")));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,rhsValue)", 0)));
}
@Test
@ -159,17 +158,17 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", "lhsValue");
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", null)));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
}
@Test
@ -185,17 +184,17 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", "lhsValue");
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", "(lhsValue,null)")));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,null)", 0)));
}
@Test
@ -211,16 +210,16 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
JOINER,
leftJoin
);
final org.apache.kafka.streams.processor.Processor<String, SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
final MockProcessorContext context = new MockProcessorContext();
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
context.setRecordMetadata("topic", 0, 0);
valueGetterSupplier.put("lhs1", null);
final long[] hash = null;
processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, null));
final List<MockProcessorContext.CapturedForward> forwarded = context.forwarded();
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", null)));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
}
}