mirror of https://github.com/apache/kafka.git
MINOR: cleanup KStream JavaDocs (5/N) - stream-globalTable-inner-join (#18747)
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
8d294cf709
commit
326ee368ba
|
@ -2097,134 +2097,108 @@ public interface KStream<K, V> {
|
||||||
final Joined<K, V, VT> joined);
|
final Joined<K, V, VT> joined);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
|
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi-join.
|
||||||
* The join is a primary key table lookup join with join attribute
|
* The join is a primary key table lookup join with join attribute
|
||||||
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
|
* {@code keyValueMapper.map(streamRecord) == tableRecord.key}.
|
||||||
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
|
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
|
||||||
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
|
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time)
|
||||||
* state.
|
* internal {@link GlobalKTable} state.
|
||||||
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
|
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
|
||||||
* state and will not produce any result records.
|
* state and will not produce any result records.
|
||||||
* <p>
|
|
||||||
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
|
|
||||||
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
|
|
||||||
* The key of the result record is the same as the key of this {@code KStream}.
|
|
||||||
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
|
|
||||||
* and thus no output record will be added to the resulting {@code KStream}.
|
|
||||||
*
|
*
|
||||||
* @param globalTable the {@link GlobalKTable} to be joined with this stream
|
* <p>For each {@code KStream} record that finds a joining record in the {@link GlobalKTable} the provided
|
||||||
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
|
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
|
||||||
* to the key of the {@link GlobalKTable}
|
* The key of the result record is the same as the stream record's key.
|
||||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
* If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
|
||||||
* @param <GK> the key type of {@link GlobalKTable}
|
* If a {@code KStream} input record's value is {@code null} or if the provided {@link KeyValueMapper keySelector}
|
||||||
* @param <GV> the value type of the {@link GlobalKTable}
|
* returns {@code null}, the input record will be dropped, and no join computation is triggered.
|
||||||
* @param <RV> the value type of the resulting {@code KStream}
|
* If a {@link GlobalKTable} input record's key is {@code null} the input record will be dropped, and the table
|
||||||
* @return a {@code KStream} that contains join-records for each key and values computed by the given
|
* state won't be updated.
|
||||||
* {@link ValueJoiner}, one output for each input {@code KStream} record
|
* {@link GlobalKTable} input records with {@code null} values are considered deletes (so-called tombstone) for
|
||||||
|
* the table.
|
||||||
|
*
|
||||||
|
* <p>Example, using the first value attribute as join key:
|
||||||
|
* <table border='1'>
|
||||||
|
* <tr>
|
||||||
|
* <th>KStream</th>
|
||||||
|
* <th>GlobalKTable</th>
|
||||||
|
* <th>state</th>
|
||||||
|
* <th>result</th>
|
||||||
|
* </tr>
|
||||||
|
* <tr>
|
||||||
|
* <td><K1:(GK1,A)></td>
|
||||||
|
* <td></td>
|
||||||
|
* <td></td>
|
||||||
|
* <td></td>
|
||||||
|
* </tr>
|
||||||
|
* <tr>
|
||||||
|
* <td></td>
|
||||||
|
* <td><GK1:b></td>
|
||||||
|
* <td><GK1:b></td>
|
||||||
|
* <td></td>
|
||||||
|
* </tr>
|
||||||
|
* <tr>
|
||||||
|
* <td><K1:(GK1,C)></td>
|
||||||
|
* <td></td>
|
||||||
|
* <td><GK1:b></td>
|
||||||
|
* <td><K1:ValueJoiner((GK1,C),b)></td>
|
||||||
|
* </tr>
|
||||||
|
* </table>
|
||||||
|
*
|
||||||
|
* In contrast to {@link #join(KTable, ValueJoiner)}, there is no co-partitioning requirement between this
|
||||||
|
* {@code KStream} and the {@link GlobalKTable}.
|
||||||
|
* Also note that there are no ordering guarantees between the updates on the left and the right side of this join,
|
||||||
|
* since updates to the {@link GlobalKTable} are in no way synchronized.
|
||||||
|
* Therefore, the result of the join is inherently non-deterministic.
|
||||||
|
*
|
||||||
|
* @param globalTable
|
||||||
|
* the {@link GlobalKTable} to be joined with this stream
|
||||||
|
* @param keySelector
|
||||||
|
* a {@link KeyValueMapper} that computes the join key for stream input records
|
||||||
|
* @param joiner
|
||||||
|
* a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||||
|
*
|
||||||
|
* @param <GlobalKey> the key type of the global table
|
||||||
|
* @param <GlobalValue> the value type of the global table
|
||||||
|
* @param <VOut> the value type of the result stream
|
||||||
|
*
|
||||||
|
* @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding
|
||||||
|
* key and a value computed by the given {@link ValueJoiner}.
|
||||||
|
*
|
||||||
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
|
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
|
||||||
*/
|
*/
|
||||||
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
|
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
|
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
|
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
|
||||||
* The join is a primary key table lookup join with join attribute
|
|
||||||
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
|
|
||||||
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
|
|
||||||
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
|
|
||||||
* state.
|
|
||||||
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
|
|
||||||
* state and will not produce any result records.
|
|
||||||
* <p>
|
|
||||||
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
|
|
||||||
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
|
|
||||||
* The key of the result record is the same as the key of this {@code KStream}.
|
|
||||||
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
|
|
||||||
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
|
|
||||||
* and thus no output record will be added to the resulting {@code KStream}.
|
|
||||||
*
|
*
|
||||||
* @param globalTable the {@link GlobalKTable} to be joined with this stream
|
* <p>Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt partitioning.
|
||||||
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
|
|
||||||
* to the key of the {@link GlobalKTable}
|
|
||||||
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
|
|
||||||
* @param <GK> the key type of {@link GlobalKTable}
|
|
||||||
* @param <GV> the value type of the {@link GlobalKTable}
|
|
||||||
* @param <RV> the value type of the resulting {@code KStream}
|
|
||||||
* @return a {@code KStream} that contains join-records for each key and values computed by the given
|
|
||||||
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
|
|
||||||
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
|
|
||||||
*/
|
*/
|
||||||
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
|
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner);
|
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
|
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
|
||||||
* The join is a primary key table lookup join with join attribute
|
|
||||||
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
|
|
||||||
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
|
|
||||||
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
|
|
||||||
* state.
|
|
||||||
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
|
|
||||||
* state and will not produce any result records.
|
|
||||||
* <p>
|
|
||||||
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
|
|
||||||
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
|
|
||||||
* The key of the result record is the same as the key of this {@code KStream}.
|
|
||||||
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
|
|
||||||
* and thus no output record will be added to the resulting {@code KStream}.
|
|
||||||
*
|
*
|
||||||
* @param globalTable the {@link GlobalKTable} to be joined with this stream
|
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
|
||||||
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
|
|
||||||
* to the key of the {@link GlobalKTable}
|
|
||||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
|
||||||
* @param named a {@link Named} config used to name the processor in the topology
|
|
||||||
* @param <GK> the key type of {@link GlobalKTable}
|
|
||||||
* @param <GV> the value type of the {@link GlobalKTable}
|
|
||||||
* @param <RV> the value type of the resulting {@code KStream}
|
|
||||||
* @return a {@code KStream} that contains join-records for each key and values computed by the given
|
|
||||||
* {@link ValueJoiner}, one output for each input {@code KStream} record
|
|
||||||
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
|
|
||||||
*/
|
*/
|
||||||
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
|
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
|
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
|
||||||
final Named named);
|
final Named named);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
|
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
|
||||||
* The join is a primary key table lookup join with join attribute
|
|
||||||
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
|
|
||||||
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
|
|
||||||
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
|
|
||||||
* state.
|
|
||||||
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
|
|
||||||
* state and will not produce any result records.
|
|
||||||
* <p>
|
|
||||||
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
|
|
||||||
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
|
|
||||||
* The key of the result record is the same as the key of this {@code KStream}.
|
|
||||||
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
|
|
||||||
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
|
|
||||||
* and thus no output record will be added to the resulting {@code KStream}.
|
|
||||||
*
|
*
|
||||||
* @param globalTable the {@link GlobalKTable} to be joined with this stream
|
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
|
||||||
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
|
|
||||||
* to the key of the {@link GlobalKTable}
|
|
||||||
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
|
|
||||||
* @param named a {@link Named} config used to name the processor in the topology
|
|
||||||
* @param <GK> the key type of {@link GlobalKTable}
|
|
||||||
* @param <GV> the value type of the {@link GlobalKTable}
|
|
||||||
* @param <RV> the value type of the resulting {@code KStream}
|
|
||||||
* @return a {@code KStream} that contains join-records for each key and values computed by the given
|
|
||||||
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
|
|
||||||
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
|
|
||||||
*/
|
*/
|
||||||
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
|
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner,
|
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
|
||||||
final Named named);
|
final Named named);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
|
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
|
||||||
|
|
|
@ -1065,33 +1065,96 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({"unchecked", "resource"})
|
||||||
|
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
|
||||||
|
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
|
||||||
|
final JoinedInternal<K, V, VTable> joinedInternal,
|
||||||
|
final boolean leftJoin) {
|
||||||
|
Objects.requireNonNull(table, "table can't be null");
|
||||||
|
Objects.requireNonNull(joiner, "joiner can't be null");
|
||||||
|
|
||||||
|
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
|
||||||
|
|
||||||
|
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
|
||||||
|
|
||||||
|
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
|
||||||
|
|
||||||
|
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
|
||||||
|
|
||||||
|
if (joinedInternal.gracePeriod() != null) {
|
||||||
|
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
|
||||||
|
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
|
||||||
|
}
|
||||||
|
final String bufferName = name + "-Buffer";
|
||||||
|
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
|
||||||
|
bufferName,
|
||||||
|
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
|
||||||
|
joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde,
|
||||||
|
joinedInternal.gracePeriod(),
|
||||||
|
name)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(
|
||||||
|
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
|
||||||
|
joiner,
|
||||||
|
leftJoin,
|
||||||
|
Optional.ofNullable(joinedInternal.gracePeriod()),
|
||||||
|
bufferStoreBuilder
|
||||||
|
);
|
||||||
|
|
||||||
|
final ProcessorParameters<K, V, K, VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
|
||||||
|
final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new StreamTableJoinNode<>(
|
||||||
|
name,
|
||||||
|
processorParameters,
|
||||||
|
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
|
||||||
|
this.name,
|
||||||
|
joinedInternal.gracePeriod()
|
||||||
|
);
|
||||||
|
|
||||||
|
builder.addGraphNode(graphNode, streamTableJoinNode);
|
||||||
|
if (leftJoin) {
|
||||||
|
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not have serde for joined result
|
||||||
|
return new KStreamImpl<>(
|
||||||
|
name,
|
||||||
|
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
|
||||||
|
null,
|
||||||
|
allSourceNodes,
|
||||||
|
false,
|
||||||
|
streamTableJoinNode,
|
||||||
|
builder);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
|
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
|
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) {
|
||||||
return join(globalTable, keySelector, toValueJoinerWithKey(joiner));
|
return join(globalTable, keySelector, toValueJoinerWithKey(joiner));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
|
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner) {
|
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) {
|
||||||
return globalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty());
|
return globalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
|
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
|
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
|
||||||
final Named named) {
|
final Named named) {
|
||||||
return join(globalTable, keySelector, toValueJoinerWithKey(joiner), named);
|
return join(globalTable, keySelector, toValueJoinerWithKey(joiner), named);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
|
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
|
||||||
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector,
|
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
|
||||||
final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner,
|
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
|
||||||
final Named named) {
|
final Named named) {
|
||||||
return globalTableJoin(globalTable, keySelector, joiner, false, named);
|
return globalTableJoin(globalTable, keySelector, joiner, false, named);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1165,69 +1228,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
||||||
builder);
|
builder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "resource"})
|
|
||||||
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
|
|
||||||
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
|
|
||||||
final JoinedInternal<K, V, VTable> joinedInternal,
|
|
||||||
final boolean leftJoin) {
|
|
||||||
Objects.requireNonNull(table, "table can't be null");
|
|
||||||
Objects.requireNonNull(joiner, "joiner can't be null");
|
|
||||||
|
|
||||||
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
|
|
||||||
|
|
||||||
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
|
|
||||||
|
|
||||||
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
|
|
||||||
|
|
||||||
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
|
|
||||||
|
|
||||||
if (joinedInternal.gracePeriod() != null) {
|
|
||||||
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
|
|
||||||
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
|
|
||||||
}
|
|
||||||
final String bufferName = name + "-Buffer";
|
|
||||||
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
|
|
||||||
bufferName,
|
|
||||||
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
|
|
||||||
joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde,
|
|
||||||
joinedInternal.gracePeriod(),
|
|
||||||
name)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(
|
|
||||||
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
|
|
||||||
joiner,
|
|
||||||
leftJoin,
|
|
||||||
Optional.ofNullable(joinedInternal.gracePeriod()),
|
|
||||||
bufferStoreBuilder
|
|
||||||
);
|
|
||||||
|
|
||||||
final ProcessorParameters<K, V, K, VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
|
|
||||||
final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new StreamTableJoinNode<>(
|
|
||||||
name,
|
|
||||||
processorParameters,
|
|
||||||
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
|
|
||||||
this.name,
|
|
||||||
joinedInternal.gracePeriod()
|
|
||||||
);
|
|
||||||
|
|
||||||
builder.addGraphNode(graphNode, streamTableJoinNode);
|
|
||||||
if (leftJoin) {
|
|
||||||
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not have serde for joined result
|
|
||||||
return new KStreamImpl<>(
|
|
||||||
name,
|
|
||||||
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
|
|
||||||
null,
|
|
||||||
allSourceNodes,
|
|
||||||
false,
|
|
||||||
streamTableJoinNode,
|
|
||||||
builder);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <KOut, VOut> KStream<KOut, VOut> process(
|
public <KOut, VOut> KStream<KOut, VOut> process(
|
||||||
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
|
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
|
||||||
|
|
Loading…
Reference in New Issue