KAFKA-16335: Remove deprecated method of StreamPartitioner (#15482)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Caio Guedes 2024-08-26 22:16:43 +02:00 committed by GitHub
parent 1621f88f06
commit e8b27b6a33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 95 additions and 116 deletions

View File

@ -1007,14 +1007,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true); return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true);
} }
private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> { private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>> getPartition = maybeMulticastPartitions -> {
if (!maybeMulticastPartitions.isPresent()) { if (!maybeMulticastPartitions.isPresent()) {
return null; return Optional.empty();
} }
if (maybeMulticastPartitions.get().size() != 1) { if (maybeMulticastPartitions.get().size() != 1) {
throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set"); throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set");
} }
return maybeMulticastPartitions.get().iterator().next(); return maybeMulticastPartitions;
}; };
@ -1163,9 +1163,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX; final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty()); builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> defaultForeignResponseSinkPartitioner =
(topic, key, subscriptionResponseWrapper, numPartitions) -> {
final Integer partition = subscriptionResponseWrapper.getPrimaryPartition();
return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
};
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner = final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
tableJoinedInternal.partitioner() == null tableJoinedInternal.partitioner() == null
? (topic, key, subscriptionResponseWrapper, numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition() ? defaultForeignResponseSinkPartitioner
: (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions)); : (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions));
final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink = final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =

View File

@ -20,6 +20,10 @@ import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final WindowedSerializer<K> serializer; private final WindowedSerializer<K> serializer;
@ -40,13 +44,12 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/ */
@Override @Override
@Deprecated public Optional<Set<Integer>> partitions(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
// for windowed key, the key bytes should never be null // for windowed key, the key bytes should never be null
final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
// stick with the same built-in partitioner util functions that producer used // stick with the same built-in partitioner util functions that producer used
// to make sure its behavior is consistent with the producer // to make sure its behavior is consistent with the producer
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); return Optional.of(Collections.singleton(BuiltInPartitioner.partitionForKey(keyBytes, numPartitions)));
} }
} }

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -53,18 +52,6 @@ import java.util.Set;
*/ */
public interface StreamPartitioner<K, V> { public interface StreamPartitioner<K, V> {
/**
* Determine the partition number for a record with the given key and value and the current number of partitions.
*
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
@Deprecated
Integer partition(String topic, K key, V value, int numPartitions);
/** /**
* Determine the number(s) of the partition(s) to which a record with the given key and value should be sent, * Determine the number(s) of the partition(s) to which a record with the given key and value should be sent,
* for the given topic and current partition count * for the given topic and current partition count
@ -77,9 +64,5 @@ public interface StreamPartitioner<K, V> {
* Optional of an empty set means the record won't be sent to any partitions i.e drop it. * Optional of an empty set means the record won't be sent to any partitions i.e drop it.
* Optional of Set of integers means the partitions to which the record should be sent to. * Optional of Set of integers means the partitions to which the record should be sent to.
* */ * */
default Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions) { Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions);
final Integer partition = partition(topic, key, value, numPartitions);
return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
}
} }

View File

@ -20,6 +20,10 @@ import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> { public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
private final Serializer<K> keySerializer; private final Serializer<K> keySerializer;
@ -29,17 +33,16 @@ public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
} }
@Override @Override
@Deprecated public Optional<Set<Integer>> partitions(final String topic, final K key, final V value, final int numPartitions) {
public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
final byte[] keyBytes = keySerializer.serialize(topic, key); final byte[] keyBytes = keySerializer.serialize(topic, key);
// if the key bytes are not available, we just return null to let the producer to decide // if the key bytes are not available, we just return empty optional to let the producer decide
// which partition to send internally; otherwise stick with the same built-in partitioner // which partition to send internally; otherwise stick with the same built-in partitioner
// util functions that producer used to make sure its behavior is consistent with the producer // util functions that producer used to make sure its behavior is consistent with the producer
if (keyBytes == null) { if (keyBytes == null) {
return null; return Optional.empty();
} else { } else {
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); return Optional.of(Collections.singleton(BuiltInPartitioner.partitionForKey(keyBytes, numPartitions)));
} }
} }
} }

View File

@ -1052,12 +1052,12 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2); prepareThreadState(streamThreadTwo, state2);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0)); assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(0))));
streams.start(); streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close(); streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0)); assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(0))));
} }
} }

View File

@ -311,12 +311,6 @@ public class KStreamRepartitionIntegrationTest {
final List<KeyValue<Integer, String>> expectedRecords = expectedRecordsOnRepartition.subList(3, 5); final List<KeyValue<Integer, String>> expectedRecords = expectedRecordsOnRepartition.subList(3, 5);
class BroadcastingPartitioner implements StreamPartitioner<Integer, String> { class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
@Override
@Deprecated
public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
partitionerInvocation.incrementAndGet(); partitionerInvocation.incrementAndGet();
@ -382,7 +376,7 @@ public class KStreamRepartitionIntegrationTest {
.<Integer, String>as(repartitionName) .<Integer, String>as(repartitionName)
.withStreamPartitioner((topic, key, value, numPartitions) -> { .withStreamPartitioner((topic, key, value, numPartitions) -> {
partitionerInvocation.incrementAndGet(); partitionerInvocation.incrementAndGet();
return partition; return Optional.of(Collections.singleton(partition));
}); });
builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())) builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))

View File

@ -55,6 +55,7 @@ import org.junit.jupiter.api.Timeout;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -92,12 +93,6 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
static class MultiPartitioner implements StreamPartitioner<String, Void> { static class MultiPartitioner implements StreamPartitioner<String, Void> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Void value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Void value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Void value, final int numPartitions) {
return Optional.of(new HashSet<>(Arrays.asList(0, 1, 2))); return Optional.of(new HashSet<>(Arrays.asList(0, 1, 2)));
@ -273,8 +268,8 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2; final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final TableJoined<String, String> tableJoined = TableJoined.with( final TableJoined<String, String> tableJoined = TableJoined.with(
(topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions, (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(getKeyB(key).hashCode()) % numPartitions)),
(topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions))
); );
table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized) table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
@ -316,7 +311,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
final TableJoined<String, String> tableJoined = TableJoined.with( final TableJoined<String, String> tableJoined = TableJoined.with(
new MultiPartitioner(), new MultiPartitioner(),
(topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions))
); );
table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized) table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
@ -331,14 +326,14 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
private static Repartitioned<String, String> repartitionA() { private static Repartitioned<String, String> repartitionA() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("a"); final Repartitioned<String, String> repartitioned = Repartitioned.as("a");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions) .withStreamPartitioner((topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(getKeyB(key).hashCode()) % numPartitions)))
.withNumberOfPartitions(4); .withNumberOfPartitions(4);
} }
private static Repartitioned<String, String> repartitionB() { private static Repartitioned<String, String> repartitionB() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("b"); final Repartitioned<String, String> repartitioned = Repartitioned.as("b");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions) .withStreamPartitioner((topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Math.abs(key.hashCode()) % numPartitions)))
.withNumberOfPartitions(4); .withNumberOfPartitions(4);
} }

View File

@ -53,7 +53,9 @@ import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -136,7 +138,8 @@ public class OptimizedKTableIntegrationTest {
final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); final KeyQueryMetadata keyQueryMetadata = kafkaStreams1
.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0)));
try { try {
// Assert that the current value in store reflects all messages being processed // Assert that the current value in store reflects all messages being processed

View File

@ -151,7 +151,8 @@ public class StoreQueryIntegrationTest {
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
until(() -> { until(() -> {
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); final KeyQueryMetadata keyQueryMetadata = kafkaStreams1
.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0)));
final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore(); final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
@ -197,7 +198,8 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner // Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
until(() -> { until(() -> {
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); final KeyQueryMetadata keyQueryMetadata = kafkaStreams1
.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0)));
//key belongs to this partition //key belongs to this partition
final int keyPartition = keyQueryMetadata.partition(); final int keyPartition = keyQueryMetadata.partition();
@ -313,7 +315,8 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner // Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); final KeyQueryMetadata keyQueryMetadata = kafkaStreams1
.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> Optional.of(Collections.singleton(0)));
//key belongs to this partition //key belongs to this partition
final int keyPartition = keyQueryMetadata.partition(); final int keyPartition = keyQueryMetadata.partition();
@ -555,12 +558,6 @@ public class StoreQueryIntegrationTest {
public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception { public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception {
class BroadcastingPartitioner implements StreamPartitioner<Integer, String> { class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
@Override
@Deprecated
public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet())); return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));

View File

@ -29,9 +29,12 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WindowedStreamPartitionerTest { public class WindowedStreamPartitionerTest {
@ -67,16 +70,16 @@ public class WindowedStreamPartitionerTest {
final String value = key.toString(); final String value = key.toString();
final byte[] valueBytes = stringSerializer.serialize(topicName, value); final byte[] valueBytes = stringSerializer.serialize(topicName, value);
final Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); final Set<Integer> expected = Collections.singleton(defaultPartitioner.partition(topicName, key, keyBytes, value, valueBytes, cluster));
for (int w = 1; w < 10; w++) { for (int w = 1; w < 10; w++) {
final TimeWindow window = new TimeWindow(10 * w, 20 * w); final TimeWindow window = new TimeWindow(10 * w, 20 * w);
final Windowed<Integer> windowedKey = new Windowed<>(key, window); final Windowed<Integer> windowedKey = new Windowed<>(key, window);
@SuppressWarnings("deprecation") final Optional<Set<Integer>> actual = streamPartitioner.partitions(topicName, windowedKey, value, infos.size());
final Integer actual = streamPartitioner.partition(topicName, windowedKey, value, infos.size());
assertEquals(expected, actual); assertTrue(actual.isPresent());
assertEquals(expected, actual.get());
} }
} }

View File

@ -1569,8 +1569,8 @@ public class ProcessorTopologyTest {
assertEquals(headers, record.headers()); assertEquals(headers, record.headers());
} }
private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { private StreamPartitioner<String, String> constantPartitioner(final Integer partition) {
return (topic, key, value, numPartitions) -> partition; return (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(partition));
} }
private Topology createSimpleTopology(final int partition) { private Topology createSimpleTopology(final int partition) {
@ -1598,13 +1598,6 @@ public class ProcessorTopologyTest {
} }
static class DroppingPartitioner implements StreamPartitioner<String, String> { static class DroppingPartitioner implements StreamPartitioner<String, String> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final String value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final String value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final String value, final int numPartitions) {
final Set<Integer> partitions = new HashSet<>(); final Set<Integer> partitions = new HashSet<>();

View File

@ -143,7 +143,7 @@ public class RecordCollectorTest {
private final UUID processId = UUID.randomUUID(); private final UUID processId = UUID.randomUUID();
private final StreamPartitioner<String, Object> streamPartitioner = private final StreamPartitioner<String, Object> streamPartitioner =
(topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions; (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions));
private MockProducer<byte[], byte[]> mockProducer; private MockProducer<byte[], byte[]> mockProducer;
private StreamsProducer streamsProducer; private StreamsProducer streamsProducer;
@ -313,13 +313,6 @@ public class RecordCollectorTest {
@Test @Test
public void shouldSendOnlyToEvenPartitions() { public void shouldSendOnlyToEvenPartitions() {
class EvenPartitioner implements StreamPartitioner<String, Object> { class EvenPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
final Set<Integer> partitions = new HashSet<>(); final Set<Integer> partitions = new HashSet<>();
@ -385,13 +378,6 @@ public class RecordCollectorTest {
public void shouldBroadcastToAllPartitions() { public void shouldBroadcastToAllPartitions() {
class BroadcastingPartitioner implements StreamPartitioner<String, Object> { class BroadcastingPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet())); return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
@ -453,13 +439,6 @@ public class RecordCollectorTest {
public void shouldDropAllRecords() { public void shouldDropAllRecords() {
class DroppingPartitioner implements StreamPartitioner<String, Object> { class DroppingPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
return Optional.of(Collections.emptySet()); return Optional.of(Collections.emptySet());
@ -533,13 +512,6 @@ public class RecordCollectorTest {
public void shouldUseDefaultPartitionerViaPartitions() { public void shouldUseDefaultPartitionerViaPartitions() {
class DefaultPartitioner implements StreamPartitioner<String, Object> { class DefaultPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
return Optional.empty(); return Optional.empty();
@ -597,10 +569,10 @@ public class RecordCollectorTest {
} }
@Test @Test
public void shouldUseDefaultPartitionerAsPartitionReturnsNull() { public void shouldUseDefaultPartitionerAsPartitionReturnsEmptyOptional() {
final StreamPartitioner<String, Object> streamPartitioner = final StreamPartitioner<String, Object> streamPartitioner =
(topic, key, value, numPartitions) -> null; (topic, key, value, numPartitions) -> Optional.empty();
final SinkNode<?, ?> sinkNode = new SinkNode<>( final SinkNode<?, ?> sinkNode = new SinkNode<>(
sinkNodeName, sinkNodeName,

View File

@ -130,18 +130,11 @@ public class StreamsMetadataStateTest {
topologyMetadata.buildAndRewriteTopology(); topologyMetadata.buildAndRewriteTopology();
metadataState = new StreamsMetadataState(topologyMetadata, hostOne, logContext); metadataState = new StreamsMetadataState(topologyMetadata, hostOne, logContext);
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos); metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
partitioner = (topic, key, value, numPartitions) -> 1; partitioner = (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(1));
storeNames = mkSet("table-one", "table-two", "merged-table", globalTable); storeNames = mkSet("table-one", "table-two", "merged-table", globalTable);
} }
static class MultiValuedPartitioner implements StreamPartitioner<String, Object> { static class MultiValuedPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override @Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) { public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
final Set<Integer> partitions = new HashSet<>(); final Set<Integer> partitions = new HashSet<>();
@ -299,8 +292,9 @@ public class StreamsMetadataStateTest {
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2); final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key", final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) -> 2); (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(2)));
assertEquals(expected, actual); assertEquals(expected, actual);
} }

View File

@ -23,6 +23,9 @@ import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util
import java.util.Optional
class ProducedTest { class ProducedTest {
@Test @Test
@ -37,7 +40,16 @@ class ProducedTest {
@Test @Test
def testCreateProducedWithSerdesAndStreamPartitioner(): Unit = { def testCreateProducedWithSerdesAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] { val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 override def partitions(
topic: String,
key: String,
value: Long,
numPartitions: Int
): Optional[util.Set[Integer]] = {
val partitions = new util.HashSet[Integer]()
partitions.add(Int.box(0))
Optional.of(partitions)
}
} }
val produced: Produced[String, Long] = Produced.`with`(partitioner) val produced: Produced[String, Long] = Produced.`with`(partitioner)

View File

@ -23,6 +23,9 @@ import org.apache.kafka.streams.scala.serialization.Serdes._
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util
import java.util.Optional
class RepartitionedTest { class RepartitionedTest {
@Test @Test
@ -58,7 +61,16 @@ class RepartitionedTest {
@Test @Test
def testCreateRepartitionedWithSerdesAndTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = { def testCreateRepartitionedWithSerdesAndTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] { val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 override def partitions(
topic: String,
key: String,
value: Long,
numPartitions: Int
): Optional[util.Set[Integer]] = {
val partitions = new util.HashSet[Integer]()
partitions.add(Int.box(0))
Optional.of(partitions)
}
} }
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](partitioner) val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](partitioner)
@ -71,7 +83,16 @@ class RepartitionedTest {
@Test @Test
def testCreateRepartitionedWithTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = { def testCreateRepartitionedWithTopicNameAndNumPartitionsAndStreamPartitioner(): Unit = {
val partitioner = new StreamPartitioner[String, Long] { val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 override def partitions(
topic: String,
key: String,
value: Long,
numPartitions: Int
): Optional[util.Set[Integer]] = {
val partitions = new util.HashSet[Integer]()
partitions.add(Int.box(0))
Optional.of(partitions)
}
} }
val repartitioned: Repartitioned[String, Long] = val repartitioned: Repartitioned[String, Long] =
Repartitioned Repartitioned