diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 039d1d448b3..f06059f7c0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -1007,14 +1007,14 @@ public class KTableImpl extends AbstractStream implements KTable< return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true); } - private final Function>, Integer> getPartition = maybeMulticastPartitions -> { + private final Function>, Optional>> getPartition = maybeMulticastPartitions -> { if (!maybeMulticastPartitions.isPresent()) { - return null; + return Optional.empty(); } if (maybeMulticastPartitions.get().size() != 1) { throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set"); } - return maybeMulticastPartitions.get().iterator().next(); + return maybeMulticastPartitions; }; @@ -1163,9 +1163,15 @@ public class KTableImpl extends AbstractStream implements KTable< final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX; builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty()); + final StreamPartitioner> defaultForeignResponseSinkPartitioner = + (topic, key, subscriptionResponseWrapper, numPartitions) -> { + final Integer partition = subscriptionResponseWrapper.getPrimaryPartition(); + return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition)); + }; + final StreamPartitioner> foreignResponseSinkPartitioner = tableJoinedInternal.partitioner() == null - ? (topic, key, subscriptionResponseWrapper, numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition() + ? defaultForeignResponseSinkPartitioner : (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions)); final StreamSinkNode> foreignResponseSink = diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index f1ea71981bf..9a8fb9ddd7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -20,6 +20,10 @@ import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StreamPartitioner; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; + public class WindowedStreamPartitioner implements StreamPartitioner, V> { private final WindowedSerializer serializer; @@ -40,13 +44,12 @@ public class WindowedStreamPartitioner implements StreamPartitioner windowedKey, final V value, final int numPartitions) { + public Optional> partitions(final String topic, final Windowed windowedKey, final V value, final int numPartitions) { // for windowed key, the key bytes should never be null final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); // stick with the same built-in partitioner util functions that producer used // to make sure its behavior is consistent with the producer - return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); + return Optional.of(Collections.singleton(BuiltInPartitioner.partitionForKey(keyBytes, numPartitions))); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java index cc2fe7a5874..ad0d934fce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.Topology; -import java.util.Collections; import java.util.Optional; import java.util.Set; @@ -53,18 +52,6 @@ import java.util.Set; */ public interface StreamPartitioner { - /** - * Determine the partition number for a record with the given key and value and the current number of partitions. - * - * @param topic the topic name this record is sent to - * @param key the key of the record - * @param value the value of the record - * @param numPartitions the total number of partitions - * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used - */ - @Deprecated - Integer partition(String topic, K key, V value, int numPartitions); - /** * Determine the number(s) of the partition(s) to which a record with the given key and value should be sent, * for the given topic and current partition count @@ -77,9 +64,5 @@ public interface StreamPartitioner { * Optional of an empty set means the record won't be sent to any partitions i.e drop it. * Optional of Set of integers means the partitions to which the record should be sent to. * */ - default Optional> partitions(String topic, K key, V value, int numPartitions) { - final Integer partition = partition(topic, key, value, numPartitions); - return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition)); - } - + Optional> partitions(String topic, K key, V value, int numPartitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java index d51b9791291..439eb379e00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java @@ -20,6 +20,10 @@ import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; + public class DefaultStreamPartitioner implements StreamPartitioner { private final Serializer keySerializer; @@ -29,17 +33,16 @@ public class DefaultStreamPartitioner implements StreamPartitioner { } @Override - @Deprecated - public Integer partition(final String topic, final K key, final V value, final int numPartitions) { + public Optional> partitions(final String topic, final K key, final V value, final int numPartitions) { final byte[] keyBytes = keySerializer.serialize(topic, key); - // if the key bytes are not available, we just return null to let the producer to decide + // if the key bytes are not available, we just return empty optional to let the producer decide // which partition to send internally; otherwise stick with the same built-in partitioner // util functions that producer used to make sure its behavior is consistent with the producer if (keyBytes == null) { - return null; + return Optional.empty(); } else { - return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); + return Optional.of(Collections.singleton(BuiltInPartitioner.partitionForKey(keyBytes, numPartitions))); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index a076b9f5b1a..a37c56443c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1052,12 +1052,12 @@ public class KafkaStreamsTest { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0)); + assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(0)))); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); streams.close(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); - assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0)); + assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(0)))); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 6bfc4bdaf15..a4a6d0d4424 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -311,12 +311,6 @@ public class KStreamRepartitionIntegrationTest { final List> expectedRecords = expectedRecordsOnRepartition.subList(3, 5); class BroadcastingPartitioner implements StreamPartitioner { - @Override - @Deprecated - public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final Integer key, final String value, final int numPartitions) { partitionerInvocation.incrementAndGet(); @@ -382,7 +376,7 @@ public class KStreamRepartitionIntegrationTest { .as(repartitionName) .withStreamPartitioner((topic, key, value, numPartitions) -> { partitionerInvocation.incrementAndGet(); - return partition; + return Optional.of(Collections.singleton(partition)); }); builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index e2389d6a516..8afff79cc89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -55,6 +55,7 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -92,12 +93,6 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { static class MultiPartitioner implements StreamPartitioner { - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Void value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Void value, final int numPartitions) { return Optional.of(new HashSet<>(Arrays.asList(0, 1, 2))); @@ -273,8 +268,8 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { final ValueJoiner joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2; final TableJoined tableJoined = TableJoined.with( - (topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions, - (topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions + (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(getKeyB(key).hashCode()) % numPartitions)), + (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions)) ); table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized) @@ -316,7 +311,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { final TableJoined tableJoined = TableJoined.with( new MultiPartitioner(), - (topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions + (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions)) ); table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized) @@ -331,14 +326,14 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { private static Repartitioned repartitionA() { final Repartitioned repartitioned = Repartitioned.as("a"); return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) - .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions) + .withStreamPartitioner((topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(getKeyB(key).hashCode()) % numPartitions))) .withNumberOfPartitions(4); } private static Repartitioned repartitionB() { final Repartitioned repartitioned = Repartitioned.as("b"); return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) - .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions) + .withStreamPartitioner((topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions))) .withNumberOfPartitions(4); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 5c2d07fb20f..ebf9674ac35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -53,7 +53,9 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -136,7 +138,8 @@ public class OptimizedKTableIntegrationTest { final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 + .queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0))); try { // Assert that the current value in store reflects all messages being processed diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index bc49aa722a2..a5ba13bd0b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -151,7 +151,8 @@ public class StoreQueryIntegrationTest { assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); until(() -> { - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 + .queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0))); final QueryableStoreType> queryableStoreType = keyValueStore(); final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); @@ -197,7 +198,8 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); until(() -> { - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 + .queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0))); //key belongs to this partition final int keyPartition = keyQueryMetadata.partition(); @@ -313,7 +315,8 @@ public class StoreQueryIntegrationTest { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 + .queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0))); //key belongs to this partition final int keyPartition = keyQueryMetadata.partition(); @@ -555,12 +558,6 @@ public class StoreQueryIntegrationTest { public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception { class BroadcastingPartitioner implements StreamPartitioner { - @Override - @Deprecated - public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final Integer key, final String value, final int numPartitions) { return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet())); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 03ab9d45969..5c83283b540 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -29,9 +29,12 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Random; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class WindowedStreamPartitionerTest { @@ -67,16 +70,16 @@ public class WindowedStreamPartitionerTest { final String value = key.toString(); final byte[] valueBytes = stringSerializer.serialize(topicName, value); - final Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); + final Set expected = Collections.singleton(defaultPartitioner.partition(topicName, key, keyBytes, value, valueBytes, cluster)); for (int w = 1; w < 10; w++) { final TimeWindow window = new TimeWindow(10 * w, 20 * w); final Windowed windowedKey = new Windowed<>(key, window); - @SuppressWarnings("deprecation") - final Integer actual = streamPartitioner.partition(topicName, windowedKey, value, infos.size()); + final Optional> actual = streamPartitioner.partitions(topicName, windowedKey, value, infos.size()); - assertEquals(expected, actual); + assertTrue(actual.isPresent()); + assertEquals(expected, actual.get()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 84506131c7a..17d125c2c6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -1569,8 +1569,8 @@ public class ProcessorTopologyTest { assertEquals(headers, record.headers()); } - private StreamPartitioner constantPartitioner(final Integer partition) { - return (topic, key, value, numPartitions) -> partition; + private StreamPartitioner constantPartitioner(final Integer partition) { + return (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(partition)); } private Topology createSimpleTopology(final int partition) { @@ -1598,13 +1598,6 @@ public class ProcessorTopologyTest { } static class DroppingPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final String value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final String value, final int numPartitions) { final Set partitions = new HashSet<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index ec8f6e5a9f9..a5bfa93ead8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -143,7 +143,7 @@ public class RecordCollectorTest { private final UUID processId = UUID.randomUUID(); private final StreamPartitioner streamPartitioner = - (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions; + (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions)); private MockProducer mockProducer; private StreamsProducer streamsProducer; @@ -313,13 +313,6 @@ public class RecordCollectorTest { @Test public void shouldSendOnlyToEvenPartitions() { class EvenPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Object value, final int numPartitions) { final Set partitions = new HashSet<>(); @@ -385,13 +378,6 @@ public class RecordCollectorTest { public void shouldBroadcastToAllPartitions() { class BroadcastingPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Object value, final int numPartitions) { return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet())); @@ -453,13 +439,6 @@ public class RecordCollectorTest { public void shouldDropAllRecords() { class DroppingPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Object value, final int numPartitions) { return Optional.of(Collections.emptySet()); @@ -533,13 +512,6 @@ public class RecordCollectorTest { public void shouldUseDefaultPartitionerViaPartitions() { class DefaultPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Object value, final int numPartitions) { return Optional.empty(); @@ -597,10 +569,10 @@ public class RecordCollectorTest { } @Test - public void shouldUseDefaultPartitionerAsPartitionReturnsNull() { + public void shouldUseDefaultPartitionerAsPartitionReturnsEmptyOptional() { final StreamPartitioner streamPartitioner = - (topic, key, value, numPartitions) -> null; + (topic, key, value, numPartitions) -> Optional.empty(); final SinkNode sinkNode = new SinkNode<>( sinkNodeName, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 55826f19eb5..437d4683eef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -130,18 +130,11 @@ public class StreamsMetadataStateTest { topologyMetadata.buildAndRewriteTopology(); metadataState = new StreamsMetadataState(topologyMetadata, hostOne, logContext); metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos); - partitioner = (topic, key, value, numPartitions) -> 1; + partitioner = (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(1)); storeNames = mkSet("table-one", "table-two", "merged-table", globalTable); } static class MultiValuedPartitioner implements StreamPartitioner { - - @Override - @Deprecated - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return null; - } - @Override public Optional> partitions(final String topic, final String key, final Object value, final int numPartitions) { final Set partitions = new HashSet<>(); @@ -299,8 +292,9 @@ public class StreamsMetadataStateTest { final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2); + final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key", - (topic, key, value, numPartitions) -> 2); + (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(2))); assertEquals(expected, actual); } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala index 69c4b170609..6fca794bfb5 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala @@ -23,6 +23,9 @@ import org.apache.kafka.streams.scala.serialization.Serdes._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util +import java.util.Optional + class ProducedTest { @Test @@ -37,7 +40,16 @@ class ProducedTest { @Test def testCreateProducedWithSerdesAndStreamPartitioner(): Unit = { val partitioner = new StreamPartitioner[String, Long] { - override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 + override def partitions( + topic: String, + key: String, + value: Long, + numPartitions: Int + ): Optional[util.Set[Integer]] = { + val partitions = new util.HashSet[Integer]() + partitions.add(Int.box(0)) + Optional.of(partitions) + } } val produced: Produced[String, Long] = Produced.`with`(partitioner) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/RepartitionedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/RepartitionedTest.scala index 4c8d8951b0a..ee3515ac612 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/RepartitionedTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/RepartitionedTest.scala @@ -23,6 +23,9 @@ import org.apache.kafka.streams.scala.serialization.Serdes._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util +import java.util.Optional + class RepartitionedTest { @Test @@ -58,7 +61,16 @@ class RepartitionedTest { @Test def testCreateRepartitionedWithSerdesAndTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = { val partitioner = new StreamPartitioner[String, Long] { - override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 + override def partitions( + topic: String, + key: String, + value: Long, + numPartitions: Int + ): Optional[util.Set[Integer]] = { + val partitions = new util.HashSet[Integer]() + partitions.add(Int.box(0)) + Optional.of(partitions) + } } val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](partitioner) @@ -71,7 +83,16 @@ class RepartitionedTest { @Test def testCreateRepartitionedWithTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = { val partitioner = new StreamPartitioner[String, Long] { - override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 + override def partitions( + topic: String, + key: String, + value: Long, + numPartitions: Int + ): Optional[util.Set[Integer]] = { + val partitions = new util.HashSet[Integer]() + partitions.add(Int.box(0)) + Optional.of(partitions) + } } val repartitioned: Repartitioned[String, Long] = Repartitioned