KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation (#17892)

This PR includes the API for KIP-1112 and a partial implementation, which wraps any processors added through the PAPI and the DSL processors that are written to the topology through the ProcessorParameters#addProcessorTo method.

Further PRs will complete the implementation by converting the remaining DSL operators to using the #addProcessorTo method, and future-proof the processor writing mechanism to prevent new DSL operators from being implemented incorrectly/without the wrapper

Reviewers: Almog Gavra <almog@responsive.dev>, Guozhang Wang <guozhang.wang.us@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2024-11-23 21:19:19 -08:00 committed by GitHub
parent 8a90ca0528
commit 87b902d35d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 629 additions and 18 deletions

View File

@ -49,6 +49,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
@ -675,6 +676,11 @@ public class StreamsConfig extends AbstractConfig {
"recommended setting for production; for development you can change this, by adjusting broker setting " +
"<code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";
/** {@code processor.wrapper.class} */
public static final String PROCESSOR_WRAPPER_CLASS_CONFIG = "processor.wrapper.class";
public static final String PROCESSOR_WRAPPER_CLASS_DOC = "A processor wrapper class or class name that implements the <code>org.apache.kafka.streams.state.ProcessorWrapper</code> interface. "
+ "Must be passed in to the StreamsBuilder or Topology constructor in order to take effect";
/** {@code repartition.purge.interval.ms} */
@SuppressWarnings("WeakerAccess")
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
@ -1124,6 +1130,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(60 * 1000L),
Importance.LOW,
PROBING_REBALANCE_INTERVAL_MS_DOC)
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class,
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
32 * 1024,

View File

@ -714,8 +714,10 @@ public class Topology {
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier);
internalTopologyBuilder.addProcessor(name, wrapped, parentNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
internalTopologyBuilder.addStateStore(storeBuilder, name);

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;
@ -38,6 +39,7 @@ import java.util.Properties;
import java.util.function.Supplier;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
@ -57,6 +59,8 @@ import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
@ -68,13 +72,26 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated)
* will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method.
* topology builders via the {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications,
* or the {@link Topology#Topology(TopologyConfig)} for PAPI applications.
* <p>
* Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the
* topology is being built, which means they have to be passed in as a TopologyConfig to the
* {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the
* {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor (DSL).
* If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them
* to be applied and the config will be ignored.
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class.getName(),
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
@ -147,8 +164,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;
public TopologyConfig(final StreamsConfig globalAppConfigs) {
this(null, globalAppConfigs, new Properties());
public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}
public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
@ -35,9 +34,11 @@ public abstract class AbstractConfigurableStoreFactory implements StoreFactory {
@Override
public void configure(final StreamsConfig config) {
if (dslStoreSuppliers == null) {
dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
dslStoreSuppliers = config.getConfiguredInstance(
StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
DslStoreSuppliers.class,
config.originals()
);
}
}

View File

@ -17,12 +17,15 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
/**
* Class used to represent a {@link ProcessorSupplier} or {@link FixedKeyProcessorSupplier} and the name
* used to register it with the {@link org.apache.kafka.streams.processor.internals.InternalTopologyBuilder}
@ -78,18 +81,30 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) {
if (processorSupplier != null) {
topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames);
if (processorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
topologyBuilder.wrapProcessorSupplier(processorName, processorSupplier);
topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}
if (fixedKeyProcessorSupplier != null) {
topologyBuilder.addProcessor(processorName, fixedKeyProcessorSupplier, parentNodeNames);
if (fixedKeyProcessorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : fixedKeyProcessorSupplier.stores()) {
ApiUtils.checkSupplier(fixedKeyProcessorSupplier);
final FixedKeyProcessorSupplier<KIn, VIn, VOut> wrapped =
topologyBuilder.wrapFixedKeyProcessorSupplier(processorName, fixedKeyProcessorSupplier);
topologyBuilder.addProcessor(processorName, wrapped, parentNodeNames);
final Set<StoreBuilder<?>> stores = wrapped.stores();
if (stores != null) {
for (final StoreBuilder<?> storeBuilder : stores) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.api;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedFixedKeyProcessorSupplierImpl;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedProcessorSupplierImpl;
import java.util.Map;
/**
* Wrapper class that can be used to inject custom wrappers around the processors of their application topology.
* The returned instance should wrap the supplied {@code ProcessorSupplier} and the {@code Processor} it supplies
* to avoid disrupting the regular processing of the application, although this is not required and any processor
* implementation can be substituted in to replace the original processor entirely (which may be useful for example
* while testing or debugging an application topology).
* <p>
* NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG} config and pass it
* in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the
* appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)})
* <p>
* Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when
* the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in
* to the {@link StreamsBuilder} or {@link Topology} constructor.
*/
public interface ProcessorWrapper extends Configurable {
@Override
default void configure(final Map<String, ?> configs) {
// do nothing
}
/**
* Wrap or replace the provided {@link ProcessorSupplier} and return a {@link WrappedProcessorSupplier}
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
<KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier);
/**
* Wrap or replace the provided {@link FixedKeyProcessorSupplier} and return a {@link WrappedFixedKeyProcessorSupplier}
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
<KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier);
/**
* Use to convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier}
*/
static <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> asWrapped(
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return new WrappedProcessorSupplierImpl<>(processorSupplier);
}
/**
* Use to convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier}
*/
static <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> asWrappedFixedKey(
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return new WrappedFixedKeyProcessorSupplierImpl<>(processorSupplier);
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.api;
/**
* Marker interface for classes implementing {@link FixedKeyProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link FixedKeyProcessorSupplier} instance into a {@link WrappedFixedKeyProcessorSupplier},
* use the {@link ProcessorWrapper#asWrappedFixedKey(FixedKeyProcessorSupplier)} method
*/
public interface WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> extends FixedKeyProcessorSupplier<KIn, VIn, VOut> {
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.api;
/**
* Marker interface for classes implementing {@link ProcessorSupplier}
* that have been wrapped via a {@link ProcessorWrapper}.
* <p>
* To convert a {@link ProcessorSupplier} instance into a {@link WrappedProcessorSupplier},
* use the {@link ProcessorWrapper#asWrapped(ProcessorSupplier)} method
*/
public interface WrappedProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, VIn, KOut, VOut> {
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
@ -31,6 +32,9 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.state.StoreBuilder;
@ -52,6 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -61,16 +66,32 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
public class InternalTopologyBuilder {
public InternalTopologyBuilder() {
this.topologyName = null;
this.processorWrapper = new NoOpProcessorWrapper();
}
public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
this.topologyConfigs = topologyConfigs;
this.topologyName = topologyConfigs.topologyName;
try {
processorWrapper = topologyConfigs.getConfiguredInstance(
PROCESSOR_WRAPPER_CLASS_CONFIG,
ProcessorWrapper.class,
topologyConfigs.originals()
);
} catch (final Exception e) {
final String errorMessage = String.format(
"Unable to instantiate ProcessorWrapper from value of config %s. Please provide a valid class "
+ "that implements the ProcessorWrapper interface.", PROCESSOR_WRAPPER_CLASS_CONFIG);
log.error(errorMessage, e);
throw new ConfigException(errorMessage, e);
}
}
private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
@ -143,6 +164,8 @@ public class InternalTopologyBuilder {
// Used to capture subscribed topics via Patterns discovered during the partition assignment process.
private final Set<String> subscriptionUpdates = new HashSet<>();
private final ProcessorWrapper processorWrapper;
private String applicationId = null;
// keyed by subtopology id
@ -367,7 +390,11 @@ public class InternalTopologyBuilder {
public final synchronized void setStreamsConfig(final StreamsConfig applicationConfig) {
Objects.requireNonNull(applicationConfig, "config can't be null");
topologyConfigs = new TopologyConfig(applicationConfig);
final Properties topologyOverrides = topologyConfigs == null
? new Properties()
: topologyConfigs.topologyOverrides;
topologyConfigs = new TopologyConfig(topologyName, applicationConfig, topologyOverrides);
}
@SuppressWarnings("deprecation")
@ -2245,4 +2272,22 @@ public class InternalTopologyBuilder {
public synchronized Map<String, StoreFactory> stateStores() {
return stateFactories;
}
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(
final String name,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return ProcessorWrapper.asWrappedFixedKey(
processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier)
);
}
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(
final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return ProcessorWrapper.asWrapped(
processorWrapper.wrapProcessorSupplier(name, processorSupplier)
);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class NoOpProcessorWrapper implements ProcessorWrapper {
@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
return ProcessorWrapper.asWrapped(processorSupplier);
}
@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
}
public static class WrappedProcessorSupplierImpl<KIn, VIn, KOut, VOut> implements WrappedProcessorSupplier<KIn, VIn, KOut, VOut> {
private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
public WrappedProcessorSupplierImpl(final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate) {
this.delegate = delegate;
}
@Override
public Set<StoreBuilder<?>> stores() {
return delegate.stores();
}
@Override
public Processor<KIn, VIn, KOut, VOut> get() {
return delegate.get();
}
}
public static class WrappedFixedKeyProcessorSupplierImpl<KIn, VIn, VOut> implements WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> {
private final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegate;
public WrappedFixedKeyProcessorSupplierImpl(final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegate) {
this.delegate = delegate;
}
@Override
public Set<StoreBuilder<?>> stores() {
return delegate.stores();
}
@Override
public FixedKeyProcessor<KIn, VIn, VOut> get() {
return delegate.get();
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
@ -54,6 +55,7 @@ import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
@ -62,24 +64,31 @@ import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
import static org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG;
import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@ -1405,6 +1414,74 @@ public class StreamsBuilderTest {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KTABLE-SOURCE-0000000001");
}
@Test
public void shouldWrapProcessorsForProcess() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class);
final AtomicInteger wrappedProcessorCount = new AtomicInteger();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
// Add a bit of randomness to the lambda-created processors to avoid them being
// optimized into a shared instance that will cause the ApiUtils#checkSupplier
// call to fail
final Random random = new Random();
builder.stream("input")
.process((ProcessorSupplier<Object, Object, Object, Object>) () -> record -> System.out.println("Processing: " + random.nextInt()))
.processValues(() -> record -> System.out.println("Processing: " + random.nextInt()))
.to("output");
builder.build();
assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
}
@Test
public void shouldWrapProcessorsForAggregationOperators() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class);
final AtomicInteger wrappedProcessorCount = new AtomicInteger();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input")
.groupByKey()
.count() // wrapped 1
.toStream()// wrapped 2
.to("output");
builder.build();
assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
}
@Test
public void shouldWrapProcessorsForStatelessOperators() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class);
final AtomicInteger wrappedProcessorCount = new AtomicInteger();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input")
.filter((k, v) -> true) // wrapped 1
.map(KeyValue::new) // wrapped 2
.selectKey((k, v) -> k) // wrapped 3
.peek((k, v) -> { }) // wrapped 4
.flatMapValues(e -> new ArrayList<>()) // wrapped 5
.toTable() // wrapped 6
.toStream() // wrapped 7
.to("output");
builder.build();
assertThat(wrappedProcessorCount.get(), CoreMatchers.is(7));
}
@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");

View File

@ -38,9 +38,11 @@ import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.RecordCollectorTest;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
import org.apache.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
@ -69,6 +71,7 @@ import static org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
@ -1219,6 +1222,24 @@ public class StreamsConfigTest {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
public void shouldReturnDefaultProcessorWrapperClass() {
final String defaultWrapperClassName = streamsConfig.getClass(PROCESSOR_WRAPPER_CLASS_CONFIG).getName();
assertThat(defaultWrapperClassName, equalTo(NoOpProcessorWrapper.class.getName()));
}
@Test
public void shouldAllowConfiguringProcessorWrapperWithClass() {
props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class);
new StreamsConfig(props);
}
@Test
public void shouldAllowConfiguringProcessorWrapperWithClassName() {
props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class.getName());
new StreamsConfig(props);
}
@Test
public void shouldSupportAllUpgradeFromValues() {
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {

View File

@ -45,6 +45,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
@ -66,11 +67,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG;
import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -2415,6 +2422,40 @@ public class TopologyTest {
assertThat(stateStoreFactory.loggingEnabled(), equalTo(false));
}
@Test
public void shouldWrapProcessors() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class);
final AtomicInteger wrappedProcessorCount = new AtomicInteger();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
final Topology topology = new Topology(new TopologyConfig(new StreamsConfig(props)));
// Add a bit of randomness to the lambda-created processors to avoid them being
// optimized into a shared instance that will cause the ApiUtils#checkSupplier
// call to fail
final Random random = new Random();
topology.addSource("source", "topic");
topology.addProcessor(
"p1",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
"source"
);
topology.addProcessor(
"p2",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
"p1"
);
topology.addProcessor(
"p3",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
"p2"
);
assertThat(wrappedProcessorCount.get(), is(3));
}
@SuppressWarnings("deprecation")
private TopologyConfig overrideDefaultStore(final String defaultStore) {
final Properties topologyOverrides = new Properties();

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@ -30,8 +31,15 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.SubtopologyDescription;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.state.KeyValueStore;
@ -63,9 +71,11 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_2;
import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -1362,4 +1372,86 @@ public class InternalTopologyBuilderTest {
assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic));
}
@Test
public void shouldWrapProcessorSupplier() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class);
final InternalTopologyBuilder builder =
new InternalTopologyBuilder(new TopologyConfig(new StreamsConfig(props)));
final ProcessorSupplier<String, String, String, String> throwingProcessorSupplier =
() -> (Processor<String, String, String, String>) record -> {
throw new RuntimeException("oops, don't call process on me!");
};
final ProcessorSupplier<String, String, String, String> wrappedProcessorSupplier =
builder.wrapProcessorSupplier("name", throwingProcessorSupplier);
final Processor<String, String, String, String> throwingProcessor = throwingProcessorSupplier.get();
final Processor<String, String, String, String> wrappedProcessor = wrappedProcessorSupplier.get();
final Record<String, String> input = new Record<>("key", "value", 0L);
assertThrows(RuntimeException.class, () -> throwingProcessor.process(input));
// wrapped processor should not throw
wrappedProcessor.process(input);
}
@Test
public void shouldWrapFixedKeyProcessorSupplier() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class.getName());
final InternalTopologyBuilder builder =
new InternalTopologyBuilder(new TopologyConfig(new StreamsConfig(props)));
final FixedKeyProcessorSupplier<String, String, String> throwingProcessorSupplier =
() -> (FixedKeyProcessor<String, String, String>) record -> {
throw new RuntimeException("oops, don't call process on me!");
};
final FixedKeyProcessorSupplier<String, String, String> wrappedProcessorSupplier =
builder.wrapFixedKeyProcessorSupplier("name", throwingProcessorSupplier);
final FixedKeyProcessor<String, String, String> throwingProcessor = throwingProcessorSupplier.get();
final FixedKeyProcessor<String, String, String> wrappedProcessor = wrappedProcessorSupplier.get();
// TODO: when we expose a public constructor for FixedKeyRecord we should pass in a real one here
final FixedKeyRecord<String, String> input = null;
assertThrows(RuntimeException.class, () -> throwingProcessor.process(input));
// wrapped processor should not throw
wrappedProcessor.process(input);
}
@Test
public void shouldThrowOnInvalidProcessorWrapperClassName() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, "invalid.class");
assertThrows(
ConfigException.class,
() -> new InternalTopologyBuilder(new TopologyConfig(new StreamsConfig(props)))
);
}
public static class ProcessorSkippingWrapper implements ProcessorWrapper {
@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
return () -> (Processor<KIn, VIn, KOut, VOut>) record -> {
// do nothing
};
}
@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
return () -> (FixedKeyProcessor<KIn, VIn, VOut>) record -> {
// do nothing
};
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@ -48,6 +49,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import static java.util.Collections.emptyMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@ -77,8 +79,14 @@ public class KeyValueStoreMaterializerTest {
@BeforeEach
public void setUp() {
doReturn(BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class)
.when(streamsConfig).getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG);
doReturn(emptyMap())
.when(streamsConfig).originals();
doReturn(new BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers())
.when(streamsConfig).getConfiguredInstance(
StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
DslStoreSuppliers.class,
emptyMap()
);
}
private void mockInnerVersionedStore() {

View File

@ -18,19 +18,31 @@ package org.apache.kafka.streams.utils;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.junit.jupiter.api.TestInfo;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
public class TestUtils {
public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = "wrapped.processor.count";
/**
* Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}.
* This method uses polling, which can be more error prone and slightly slower.
@ -85,4 +97,49 @@ public class TestUtils {
.replace(' ', '_')
.replace('=', '_');
}
/**
* Quick method of generating a config map prepopulated with the required
* StreamsConfig properties
*/
public static Map<Object, Object> dummyStreamsConfigMap() {
final Map<Object, Object> baseConfigs = new HashMap<>();
baseConfigs.put(APPLICATION_ID_CONFIG, "dummy-app-id");
baseConfigs.put(BOOTSTRAP_SERVERS_CONFIG, "local");
return baseConfigs;
}
/**
* Simple pass-through processor wrapper that counts the number of processors
* it wraps.
* To retrieve the current count, pass an instance of AtomicInteger into the configs
* alongside the wrapper itself. Use the config key defined with {@link #PROCESSOR_WRAPPER_COUNTER_CONFIG}
*/
public static class CountingProcessorWrapper implements ProcessorWrapper {
private AtomicInteger wrappedProcessorCount;
@Override
public void configure(final Map<String, ?> configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
wrappedProcessorCount = (AtomicInteger) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
} else {
wrappedProcessorCount = new AtomicInteger();
}
}
@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
wrappedProcessorCount.incrementAndGet();
return ProcessorWrapper.asWrapped(processorSupplier);
}
@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
wrappedProcessorCount.incrementAndGet();
return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
}
}
}