diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 9fd7673a025..0704e337403 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -17,13 +17,17 @@ package org.apache.kafka.clients.consumer; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Optional; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; /** * This interface is used to define custom partition assignment for use in @@ -254,4 +258,41 @@ public interface ConsumerPartitionAssignor { } } + /** + * Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} + * based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG} + */ + static List getAssignorInstances(List assignorClasses, Map configs) { + List assignors = new ArrayList<>(); + + if (assignorClasses == null) + return assignors; + + for (Object klass : assignorClasses) { + // first try to get the class if passed in as a string + if (klass instanceof String) { + try { + klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader()); + } catch (ClassNotFoundException classNotFound) { + throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound); + } + } + + if (klass instanceof Class) { + Object assignor = Utils.newInstance((Class) klass); + if (assignor instanceof Configurable) + ((Configurable) assignor).configure(configs); + + if (assignor instanceof ConsumerPartitionAssignor) { + assignors.add((ConsumerPartitionAssignor) assignor); + } else { + throw new KafkaException(klass + " is not an instance of " + ConsumerPartitionAssignor.class.getName()); + } + } else { + throw new KafkaException("List contains element of type " + klass.getClass().getName() + ", expected String or Class"); + } + } + return assignors; + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 460383c563e..1f4bc7ce977 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -81,8 +81,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances; - /** * A client that consumes records from a Kafka cluster. *

@@ -768,8 +766,10 @@ public class KafkaConsumer implements Consumer { config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation - this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))); + this.assignors = ConsumerPartitionAssignor.getAssignorInstances( + config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) + ); // no coordinator will be constructed for the default (null) group id this.coordinator = !groupId.isPresent() ? null : diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 82daa7bb1b3..ba26427eb96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -396,7 +396,7 @@ public abstract class AbstractCoordinator implements Closeable { * If this function returns false, the state can be in one of the following: * * UNJOINED: got error response but times out before being able to re-join, heartbeat disabled * * PREPARING_REBALANCE: not yet received join-group response before timeout, heartbeat disabled - * * COMPLETING_REBALANCE: not yet received sync-group response before timeout, hearbeat enabled + * * COMPLETING_REBALANCE: not yet received sync-group response before timeout, heartbeat enabled * * Visible for testing. * diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java deleted file mode 100644 index 43bd7519169..00000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.TopicPartition; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * This interface is used to define custom partition assignment for use in - * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe - * to the topics they are interested in and forward their subscriptions to a Kafka broker serving - * as the group coordinator. The coordinator selects one member to perform the group assignment and - * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called - * to perform the assignment and the results are forwarded back to each respective members - * - * In some cases, it is useful to forward additional metadata to the assignor in order to make - * assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom - * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation - * can use this user data to forward the rackId belonging to each member. - * - * This interface has been deprecated in 2.4, custom assignors should now implement - * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}. Note that maintaining compatibility - * for an internal interface here is a special case, as {@code PartitionAssignor} was meant to be a public API - * although it was placed in the internals package. Users should not expect internal interfaces or classes to - * not be removed or maintain compatibility in any way. - */ -@Deprecated -public interface PartitionAssignor { - - /** - * Return a serializable object representing the local member's subscription. This can include - * additional information as well (e.g. local host/rack information) which can be leveraged in - * {@link #assign(Cluster, Map)}. - * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)} - * and variants - * @return Non-null subscription with optional user data - */ - Subscription subscription(Set topics); - - /** - * Perform the group assignment given the member subscriptions and current cluster metadata. - * @param metadata Current topic/broker metadata known by consumer - * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)} - * @return A map from the members to their respective assignment. This should have one entry - * for all members who in the input subscription map. - */ - Map assign(Cluster metadata, Map subscriptions); - - /** - * Callback which is invoked when a group member receives its assignment from the leader. - * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} - */ - void onAssignment(Assignment assignment); - - /** - * Callback which is invoked when a group member receives its assignment from the leader. - * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} - * @param generation The consumer group generation associated with this partition assignment (optional) - */ - default void onAssignment(Assignment assignment, int generation) { - onAssignment(assignment); - } - - /** - * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") - * @return non-null unique name - */ - String name(); - - class Subscription { - private final List topics; - private final ByteBuffer userData; - - public Subscription(List topics, ByteBuffer userData) { - this.topics = topics; - this.userData = userData; - } - - public Subscription(List topics) { - this(topics, ByteBuffer.wrap(new byte[0])); - } - - public List topics() { - return topics; - } - - public ByteBuffer userData() { - return userData; - } - - @Override - public String toString() { - return "Subscription(" + - "topics=" + topics + - ')'; - } - } - - class Assignment { - private final List partitions; - private final ByteBuffer userData; - - public Assignment(List partitions, ByteBuffer userData) { - this.partitions = partitions; - this.userData = userData; - } - - public Assignment(List partitions) { - this(partitions, ByteBuffer.wrap(new byte[0])); - } - - public List partitions() { - return partitions; - } - - public ByteBuffer userData() { - return userData; - } - - @Override - public String toString() { - return "Assignment(" + - "partitions=" + partitions + - ')'; - } - } - -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java deleted file mode 100644 index 8fb791ab4fa..00000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This adapter class is used to ensure backwards compatibility for those who have implemented the {@link PartitionAssignor} - * interface, which has been deprecated in favor of the new {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}. - *

- * Note that maintaining compatibility for an internal interface here is a special case, as {@code PartitionAssignor} - * was meant to be a public API although it was placed in the internals package. Users should not expect internal - * interfaces or classes to not be removed or maintain compatibility in any way. - */ -@SuppressWarnings("deprecation") -public class PartitionAssignorAdapter implements ConsumerPartitionAssignor { - - private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignorAdapter.class); - private final PartitionAssignor oldAssignor; - - PartitionAssignorAdapter(PartitionAssignor oldAssignor) { - this.oldAssignor = oldAssignor; - } - - @Override - public ByteBuffer subscriptionUserData(Set topics) { - return oldAssignor.subscription(topics).userData(); - } - - @Override - public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) { - return toNewGroupAssignment(oldAssignor.assign(metadata, toOldGroupSubscription(groupSubscription))); - } - - @Override - public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { - oldAssignor.onAssignment(toOldAssignment(assignment), metadata.generationId()); - } - - @Override - public String name() { - return oldAssignor.name(); - } - - private static PartitionAssignor.Assignment toOldAssignment(Assignment newAssignment) { - return new PartitionAssignor.Assignment(newAssignment.partitions(), newAssignment.userData()); - } - - private static Map toOldGroupSubscription(GroupSubscription newSubscriptions) { - Map oldSubscriptions = new HashMap<>(); - for (Map.Entry entry : newSubscriptions.groupSubscription().entrySet()) { - String member = entry.getKey(); - Subscription newSubscription = entry.getValue(); - oldSubscriptions.put(member, new PartitionAssignor.Subscription( - newSubscription.topics(), newSubscription.userData())); - } - return oldSubscriptions; - } - - private static GroupAssignment toNewGroupAssignment(Map oldAssignments) { - Map newAssignments = new HashMap<>(); - for (Map.Entry entry : oldAssignments.entrySet()) { - String member = entry.getKey(); - PartitionAssignor.Assignment oldAssignment = entry.getValue(); - newAssignments.put(member, new Assignment(oldAssignment.partitions(), oldAssignment.userData())); - } - return new GroupAssignment(newAssignments); - } - - /** - * Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} - * based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG} - * where any instances of the old {@link PartitionAssignor} interface are wrapped in an adapter to the new - * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} interface - */ - public static List getAssignorInstances(List assignorClasses, Map configs) { - List assignors = new ArrayList<>(); - - if (assignorClasses == null) - return assignors; - - for (Object klass : assignorClasses) { - // first try to get the class if passed in as a string - if (klass instanceof String) { - try { - klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader()); - } catch (ClassNotFoundException classNotFound) { - throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound); - } - } - - if (klass instanceof Class) { - Object assignor = Utils.newInstance((Class) klass); - if (assignor instanceof Configurable) - ((Configurable) assignor).configure(configs); - - if (assignor instanceof ConsumerPartitionAssignor) { - assignors.add((ConsumerPartitionAssignor) assignor); - } else if (assignor instanceof PartitionAssignor) { - assignors.add(new PartitionAssignorAdapter((PartitionAssignor) assignor)); - LOG.warn("The PartitionAssignor interface has been deprecated, " - + "please implement the ConsumerPartitionAssignor interface instead."); - } else { - throw new KafkaException(klass + " is not an instance of " + PartitionAssignor.class.getName() - + " or an instance of " + ConsumerPartitionAssignor.class.getName()); - } - } else { - throw new KafkaException("List contains element of type " + klass.getClass().getName() + ", expected String or Class"); - } - } - return assignors; - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java new file mode 100644 index 00000000000..03070008c6b --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerPartitionAssignorTest { + + @Test + public void shouldInstantiateAssignor() { + List assignors = getAssignorInstances( + Collections.singletonList(StickyAssignor.class.getName()), + Collections.emptyMap() + ); + assertTrue(assignors.get(0) instanceof StickyAssignor); + } + + @Test + public void shouldInstantiateListOfAssignors() { + List assignors = getAssignorInstances( + Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName()), + Collections.emptyMap() + ); + assertTrue(assignors.get(0) instanceof StickyAssignor); + assertTrue(assignors.get(1) instanceof CooperativeStickyAssignor); + } + + @Test + public void shouldThrowKafkaExceptionOnNonAssignor() { + assertThrows(KafkaException.class, () -> getAssignorInstances( + Collections.singletonList(String.class.getName()), + Collections.emptyMap()) + ); + } + + @Test + public void shouldThrowKafkaExceptionOnAssignorNotFound() { + assertThrows(KafkaException.class, () -> getAssignorInstances( + Collections.singletonList("Non-existent assignor"), + Collections.emptyMap()) + ); + } + + @Test + public void shouldInstantiateFromClassType() { + List classTypes = + initConsumerConfigWithClassTypes(Collections.singletonList(StickyAssignor.class)) + .getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG); + List assignors = getAssignorInstances(classTypes, Collections.emptyMap()); + assertTrue(assignors.get(0) instanceof StickyAssignor); + } + + @Test + public void shouldInstantiateFromListOfClassTypes() { + List classTypes = initConsumerConfigWithClassTypes( + Arrays.asList(StickyAssignor.class, CooperativeStickyAssignor.class) + ).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG); + + List assignors = getAssignorInstances(classTypes, Collections.emptyMap()); + + assertTrue(assignors.get(0) instanceof StickyAssignor); + assertTrue(assignors.get(1) instanceof CooperativeStickyAssignor); + } + + @Test + public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() { + List classTypes = + initConsumerConfigWithClassTypes(Arrays.asList(StickyAssignor.class, String.class)) + .getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG); + + assertThrows(KafkaException.class, () -> getAssignorInstances(classTypes, Collections.emptyMap())); + } + + private ConsumerConfig initConsumerConfigWithClassTypes(List classTypes) { + Properties props = new Properties(); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes); + return new ConsumerConfig(props); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java deleted file mode 100644 index 7ce0e02dc82..00000000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.junit.jupiter.api.Test; - -public class PartitionAssignorAdapterTest { - - private List classNames; - private List classTypes; - - @Test - public void shouldInstantiateNewAssignors() { - classNames = Arrays.asList(StickyAssignor.class.getName()); - List assignors = getAssignorInstances(classNames, Collections.emptyMap()); - assertTrue(StickyAssignor.class.isInstance(assignors.get(0))); - } - - @Test - public void shouldAdaptOldAssignors() { - classNames = Arrays.asList(OldPartitionAssignor.class.getName()); - List assignors = getAssignorInstances(classNames, Collections.emptyMap()); - assertTrue(PartitionAssignorAdapter.class.isInstance(assignors.get(0))); - } - - @Test - public void shouldThrowKafkaExceptionOnNonAssignor() { - classNames = Arrays.asList(String.class.getName()); - assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap())); - } - - @Test - public void shouldThrowKafkaExceptionOnAssignorNotFound() { - classNames = Arrays.asList("Non-existent assignor"); - assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap())); - } - - @Test - public void shouldInstantiateFromListOfOldAndNewClassTypes() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - - classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class); - - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes); - KafkaConsumer consumer = new KafkaConsumer<>( - props, new StringDeserializer(), new StringDeserializer()); - - consumer.close(); - } - - @Test - public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - - classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class, String.class); - - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes); - assertThrows(KafkaException.class, () -> new KafkaConsumer<>( - props, new StringDeserializer(), new StringDeserializer())); - } - - @Test - public void testOnAssignment() { - OldPartitionAssignor oldAssignor = new OldPartitionAssignor(); - ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(oldAssignor); - - TopicPartition tp1 = new TopicPartition("tp1", 1); - TopicPartition tp2 = new TopicPartition("tp2", 2); - List partitions = Arrays.asList(tp1, tp2); - - adaptedAssignor.onAssignment(new Assignment(partitions), new ConsumerGroupMetadata("")); - - assertEquals(oldAssignor.partitions, partitions); - } - - @Test - public void testAssign() { - ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(new OldPartitionAssignor()); - - Map subscriptions = new HashMap<>(); - subscriptions.put("C1", new Subscription(Arrays.asList("topic1"))); - subscriptions.put("C2", new Subscription(Arrays.asList("topic1", "topic2"))); - subscriptions.put("C3", new Subscription(Arrays.asList("topic2", "topic3"))); - GroupSubscription groupSubscription = new GroupSubscription(subscriptions); - - Map assignments = adaptedAssignor.assign(null, groupSubscription).groupAssignment(); - - assertEquals(assignments.get("C1").partitions(), Arrays.asList(new TopicPartition("topic1", 1))); - assertEquals(assignments.get("C2").partitions(), Arrays.asList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); - assertEquals(assignments.get("C3").partitions(), Arrays.asList(new TopicPartition("topic2", 1), new TopicPartition("topic3", 1))); - } - - /* - * Dummy assignor just gives each consumer partition 1 of each topic it's subscribed to - */ - @SuppressWarnings("deprecation") - public static class OldPartitionAssignor implements PartitionAssignor { - - List partitions = null; - - @Override - public Subscription subscription(Set topics) { - return new Subscription(new ArrayList<>(topics), null); - } - - @Override - public Map assign(Cluster metadata, Map subscriptions) { - Map assignments = new HashMap<>(); - for (Map.Entry entry : subscriptions.entrySet()) { - List partitions = new ArrayList<>(); - for (String topic : entry.getValue().topics()) { - partitions.add(new TopicPartition(topic, 1)); - } - assignments.put(entry.getKey(), new Assignment(partitions, null)); - } - return assignments; - } - - @Override - public void onAssignment(Assignment assignment) { - partitions = assignment.partitions(); - } - - @Override - public String name() { - return "old-assignor"; - } - } - -} diff --git a/docs/upgrade.html b/docs/upgrade.html index 04505b4bed3..0112bfc744d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -49,8 +49,10 @@ were removed. These methods were not intended to be public API and there is no replacement.
  • The NoOffsetForPartitionException.partition() method was removed. Please use partitions() instead.
  • -
  • The Scala kafka.common.MessageFormatter was removed. Plese use the Java org.apache.kafka.common.MessageFormatter.
  • +
  • The Scala kafka.common.MessageFormatter was removed. Please use the Java org.apache.kafka.common.MessageFormatter.
  • The MessageFormatter.init(Properties) method was removed. Please use configure(Map) instead.
  • +
  • The deprecated org.apache.kafka.clients.consumer.internals.PartitionAssignor class has been removed. Please use + org.apache.kafka.clients.consumer.ConsumerPartitionAssignor instead.
  • Kafka Streams no longer has a compile time dependency on "connect:json" module (KAFKA-5146). Projects that were relying on this transitive dependency will have to explicitly declare it.
  • The deprecated quota.producer.default and quota.consumer.default configurations were removed (KAFKA-12591). diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index ea6a7834a4d..2e517f1bcbb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -190,7 +190,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs - * later's cached metadata while sending subscriptions, and the latter needs former's returned assignment when + * latter's cached metadata while sending subscriptions, and the latter needs former's returned assignment when * adding tasks. * * @throws KafkaException if the stream thread is not specified