mirror of https://github.com/apache/kafka.git
MINOR: Rename stream partition assignor to streams partition assignor (#4621)
This is a straight-forward change that make the name of the partition assignor to be aligned with Streams. Reviewers: Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
parent
7c5d0c459f
commit
f26fbb9adc
|
@ -134,7 +134,7 @@
|
|||
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
|
||||
|
||||
<suppress checks="MethodLength"
|
||||
files="StreamPartitionAssignor.java"/>
|
||||
files="StreamsPartitionAssignor.java"/>
|
||||
|
||||
<suppress checks="ParameterNumber"
|
||||
files="StreamTask.java"/>
|
||||
|
@ -142,22 +142,22 @@
|
|||
files="RocksDBWindowStoreSupplier.java"/>
|
||||
|
||||
<suppress checks="ClassDataAbstractionCoupling"
|
||||
files="(TopologyBuilder|KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
|
||||
files="(TopologyBuilder|KStreamImpl|StreamsPartitionAssignor|KafkaStreams|KTableImpl).java"/>
|
||||
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="TopologyBuilder.java"/>
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="StreamPartitionAssignor.java"/>
|
||||
files="StreamsPartitionAssignor.java"/>
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="StreamThread.java"/>
|
||||
|
||||
<suppress checks="JavaNCSS"
|
||||
files="StreamPartitionAssignor.java"/>
|
||||
files="StreamsPartitionAssignor.java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="ProcessorStateManager.java"/>
|
||||
<suppress checks="NPathComplexity"
|
||||
files="StreamPartitionAssignor.java"/>
|
||||
files="StreamsPartitionAssignor.java"/>
|
||||
<suppress checks="NPathComplexity"
|
||||
files="StreamThread.java"/>
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.kafka.streams.errors.StreamsException;
|
|||
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
|
||||
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -782,7 +782,7 @@ public class StreamsConfig extends AbstractConfig {
|
|||
consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
|
||||
consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
|
||||
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
|
||||
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
|
||||
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
|
||||
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
|
||||
|
||||
// add admin retries configs for creating topics
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.apache.kafka.common.Cluster;
|
|||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -83,7 +83,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
|
|||
|
||||
if (partitions.isEmpty()) {
|
||||
log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic);
|
||||
return StreamPartitionAssignor.NOT_AVAILABLE;
|
||||
return StreamsPartitionAssignor.NOT_AVAILABLE;
|
||||
} else {
|
||||
int numPartitions = partitions.size();
|
||||
if (numPartitions > maxNumPartitions)
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
|
|||
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
||||
import org.apache.kafka.streams.processor.internals.SinkNode;
|
||||
import org.apache.kafka.streams.processor.internals.SourceNode;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.SubscriptionUpdates;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
|
||||
import java.util.Collection;
|
||||
|
|
|
@ -54,7 +54,7 @@ import java.util.UUID;
|
|||
import static org.apache.kafka.common.utils.Utils.getHost;
|
||||
import static org.apache.kafka.common.utils.Utils.getPort;
|
||||
|
||||
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
|
||||
public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
|
||||
|
||||
private final static int UNKNOWN = -1;
|
||||
public final static int NOT_AVAILABLE = -2;
|
|
@ -19,7 +19,7 @@ package org.apache.kafka.streams.state;
|
|||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
|
||||
/**
|
||||
* Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
|
||||
|
@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
|||
* {@link KafkaStreams#metadataForKey(String, Object, Serializer)}
|
||||
*
|
||||
* The HostInfo is constructed during Partition Assignment
|
||||
* see {@link StreamPartitionAssignor}
|
||||
* see {@link StreamsPartitionAssignor}
|
||||
* It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
|
||||
*
|
||||
* If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
|
|||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -119,7 +119,7 @@ public class StreamsConfigTest {
|
|||
|
||||
assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
|
||||
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
|
||||
assertEquals(StreamPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
|
||||
assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
|
||||
assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
|
||||
assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
|
||||
assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
|
|||
import org.apache.kafka.streams.processor.internals.ProcessorNode;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
||||
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
|
||||
|
@ -661,7 +661,7 @@ public class TopologyBuilderTest {
|
|||
builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
|
||||
builder.addSource("source-3", Pattern.compile("topic-\\d"));
|
||||
|
||||
StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
|
||||
StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
|
||||
Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
|
||||
updatedTopicsField.setAccessible(true);
|
||||
|
||||
|
@ -741,7 +741,7 @@ public class TopologyBuilderTest {
|
|||
.addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
|
||||
.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
|
||||
|
||||
final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
|
||||
final StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
|
||||
final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
|
||||
updatedTopicsField.setAccessible(true);
|
||||
|
||||
|
|
|
@ -33,8 +33,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
|
||||
public class CopartitionedTopicsValidatorTest {
|
||||
|
||||
private final StreamPartitionAssignor.CopartitionedTopicsValidator validator
|
||||
= new StreamPartitionAssignor.CopartitionedTopicsValidator("thread");
|
||||
private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator
|
||||
= new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
|
||||
private final Map<TopicPartition, PartitionInfo> partitions = new HashMap<>();
|
||||
private final Cluster cluster = Cluster.empty();
|
||||
|
||||
|
@ -49,7 +49,7 @@ public class CopartitionedTopicsValidatorTest {
|
|||
@Test(expected = TopologyBuilderException.class)
|
||||
public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
|
||||
validator.validate(Collections.singleton("topic"),
|
||||
Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
|
||||
Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
|
||||
cluster);
|
||||
}
|
||||
|
||||
|
@ -57,14 +57,14 @@ public class CopartitionedTopicsValidatorTest {
|
|||
public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
|
||||
partitions.remove(new TopicPartition("second", 0));
|
||||
validator.validate(Utils.mkSet("first", "second"),
|
||||
Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
|
||||
Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
|
||||
cluster.withPartitions(partitions));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void shouldEnforceCopartitioningOnRepartitionTopics() {
|
||||
final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10);
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10);
|
||||
|
||||
validator.validate(Utils.mkSet("first", "second", metadata.config.name()),
|
||||
Collections.singletonMap(metadata.config.name(),
|
||||
|
@ -77,10 +77,10 @@ public class CopartitionedTopicsValidatorTest {
|
|||
|
||||
@Test
|
||||
public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
|
||||
final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
|
||||
final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15);
|
||||
final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5);
|
||||
final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15);
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5);
|
||||
final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
|
||||
|
||||
repartitionTopicConfig.put(one.config.name(), one);
|
||||
repartitionTopicConfig.put(two.config.name(), two);
|
||||
|
@ -100,9 +100,9 @@ public class CopartitionedTopicsValidatorTest {
|
|||
|
||||
@Test
|
||||
public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() {
|
||||
final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
|
||||
final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamPartitionAssignor.NOT_AVAILABLE);
|
||||
final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamsPartitionAssignor.NOT_AVAILABLE);
|
||||
final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
|
||||
|
||||
repartitionTopicConfig.put(one.config.name(), one);
|
||||
repartitionTopicConfig.put(two.config.name(), two);
|
||||
|
@ -114,18 +114,18 @@ public class CopartitionedTopicsValidatorTest {
|
|||
repartitionTopicConfig,
|
||||
cluster.withPartitions(partitions));
|
||||
|
||||
assertThat(one.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE));
|
||||
assertThat(two.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE));
|
||||
assertThat(one.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
|
||||
assertThat(two.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
|
||||
|
||||
}
|
||||
|
||||
private StreamPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
|
||||
final int partitions) {
|
||||
private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
|
||||
final int partitions) {
|
||||
final InternalTopicConfig repartitionTopicConfig
|
||||
= new RepartitionTopicConfig(repartitionTopic, Collections.<String, String>emptyMap());
|
||||
|
||||
final StreamPartitionAssignor.InternalTopicMetadata metadata
|
||||
= new StreamPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
|
||||
final StreamsPartitionAssignor.InternalTopicMetadata metadata
|
||||
= new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
|
||||
metadata.numPartitions = partitions;
|
||||
return metadata;
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class StreamPartitionAssignorTest {
|
||||
public class StreamsPartitionAssignorTest {
|
||||
|
||||
private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
|
||||
private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
|
||||
|
@ -105,7 +105,7 @@ public class StreamPartitionAssignorTest {
|
|||
private final TaskId task1 = new TaskId(0, 1);
|
||||
private final TaskId task2 = new TaskId(0, 2);
|
||||
private final TaskId task3 = new TaskId(0, 3);
|
||||
private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
|
||||
private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
|
||||
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
|
||||
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
|
||||
private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
|
||||
|
@ -762,7 +762,7 @@ public class StreamPartitionAssignorTest {
|
|||
.count();
|
||||
|
||||
// joining the stream and the table
|
||||
// this triggers the enforceCopartitioning() routine in the StreamPartitionAssignor,
|
||||
// this triggers the enforceCopartitioning() routine in the StreamsPartitionAssignor,
|
||||
// forcing the stream.map to get repartitioned to a topic with four partitions.
|
||||
stream1.join(
|
||||
table1,
|
Loading…
Reference in New Issue