diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f23805e665f..45ee4e60a7c 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -134,7 +134,7 @@ files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/> + files="StreamsPartitionAssignor.java"/> @@ -142,22 +142,22 @@ files="RocksDBWindowStoreSupplier.java"/> + files="(TopologyBuilder|KStreamImpl|StreamsPartitionAssignor|KafkaStreams|KTableImpl).java"/> + files="StreamsPartitionAssignor.java"/> + files="StreamsPartitionAssignor.java"/> + files="StreamsPartitionAssignor.java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index ec0a1a8385b..6b3626101bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -782,7 +782,7 @@ public class StreamsConfig extends AbstractConfig { consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG)); consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); - consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); // add admin retries configs for creating topics diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 19e480908d2..c86171c3ab0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +83,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper { if (partitions.isEmpty()) { log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic); - return StreamPartitionAssignor.NOT_AVAILABLE; + return StreamsPartitionAssignor.NOT_AVAILABLE; } else { int numPartitions = partitions.size(); if (numPartitions > maxNumPartitions) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 6f34e25a7b7..dab7bd793d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.SubscriptionUpdates; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Collection; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java similarity index 99% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2a26272021a..9aa0e94c8c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -54,7 +54,7 @@ import java.util.UUID; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; -public class StreamPartitionAssignor implements PartitionAssignor, Configurable { +public class StreamsPartitionAssignor implements PartitionAssignor, Configurable { private final static int UNKNOWN = -1; public final static int NOT_AVAILABLE = -2; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java index c1b102107d1..58cdba6b8fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; /** * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application. @@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; * {@link KafkaStreams#metadataForKey(String, Object, Serializer)} * * The HostInfo is constructed during Partition Assignment - * see {@link StreamPartitionAssignor} + * see {@link StreamsPartitionAssignor} * It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} * * If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index cc072d5508d..0309659042d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; @@ -119,7 +119,7 @@ public class StreamsConfigTest { assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)); assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); - assertEquals(StreamPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); + assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index ac3cd49923b..7a815944ecd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; @@ -661,7 +661,7 @@ public class TopologyBuilderTest { builder.addSource("source-2", Pattern.compile("topic-[A-C]")); builder.addSource("source-3", Pattern.compile("topic-\\d")); - StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates(); Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); @@ -741,7 +741,7 @@ public class TopologyBuilderTest { .addProcessor("my-processor", new MockProcessorSupplier(), "ingest") .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor"); - final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + final StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates(); final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java index 19277e902ae..bbc59fa6fa9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java @@ -33,8 +33,8 @@ import static org.hamcrest.MatcherAssert.assertThat; public class CopartitionedTopicsValidatorTest { - private final StreamPartitionAssignor.CopartitionedTopicsValidator validator - = new StreamPartitionAssignor.CopartitionedTopicsValidator("thread"); + private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator + = new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread"); private final Map partitions = new HashMap<>(); private final Cluster cluster = Cluster.empty(); @@ -49,7 +49,7 @@ public class CopartitionedTopicsValidatorTest { @Test(expected = TopologyBuilderException.class) public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() { validator.validate(Collections.singleton("topic"), - Collections.emptyMap(), + Collections.emptyMap(), cluster); } @@ -57,14 +57,14 @@ public class CopartitionedTopicsValidatorTest { public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() { partitions.remove(new TopicPartition("second", 0)); validator.validate(Utils.mkSet("first", "second"), - Collections.emptyMap(), + Collections.emptyMap(), cluster.withPartitions(partitions)); } @Test public void shouldEnforceCopartitioningOnRepartitionTopics() { - final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10); + final StreamsPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10); validator.validate(Utils.mkSet("first", "second", metadata.config.name()), Collections.singletonMap(metadata.config.name(), @@ -77,10 +77,10 @@ public class CopartitionedTopicsValidatorTest { @Test public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() { - final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); - final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15); - final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5); - final Map repartitionTopicConfig = new HashMap<>(); + final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); + final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15); + final StreamsPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5); + final Map repartitionTopicConfig = new HashMap<>(); repartitionTopicConfig.put(one.config.name(), one); repartitionTopicConfig.put(two.config.name(), two); @@ -100,9 +100,9 @@ public class CopartitionedTopicsValidatorTest { @Test public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() { - final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); - final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamPartitionAssignor.NOT_AVAILABLE); - final Map repartitionTopicConfig = new HashMap<>(); + final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); + final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamsPartitionAssignor.NOT_AVAILABLE); + final Map repartitionTopicConfig = new HashMap<>(); repartitionTopicConfig.put(one.config.name(), one); repartitionTopicConfig.put(two.config.name(), two); @@ -114,18 +114,18 @@ public class CopartitionedTopicsValidatorTest { repartitionTopicConfig, cluster.withPartitions(partitions)); - assertThat(one.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE)); - assertThat(two.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE)); + assertThat(one.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE)); + assertThat(two.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE)); } - private StreamPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic, - final int partitions) { + private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic, + final int partitions) { final InternalTopicConfig repartitionTopicConfig = new RepartitionTopicConfig(repartitionTopic, Collections.emptyMap()); - final StreamPartitionAssignor.InternalTopicMetadata metadata - = new StreamPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig); + final StreamsPartitionAssignor.InternalTopicMetadata metadata + = new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig); metadata.numPartitions = partitions; return metadata; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 02ab803735a..bf3f1d1ac5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -65,7 +65,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; -public class StreamPartitionAssignorTest { +public class StreamsPartitionAssignorTest { private final TopicPartition t1p0 = new TopicPartition("topic1", 0); private final TopicPartition t1p1 = new TopicPartition("topic1", 1); @@ -105,7 +105,7 @@ public class StreamPartitionAssignorTest { private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); private final TaskId task3 = new TaskId(0, 3); - private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); private final StreamsConfig streamsConfig = new StreamsConfig(configProps()); @@ -762,7 +762,7 @@ public class StreamPartitionAssignorTest { .count(); // joining the stream and the table - // this triggers the enforceCopartitioning() routine in the StreamPartitionAssignor, + // this triggers the enforceCopartitioning() routine in the StreamsPartitionAssignor, // forcing the stream.map to get repartitioned to a topic with four partitions. stream1.join( table1,