KAFKA-12637: Remove deprecated PartitionAssignor interface (#10512)

Remove PartitionAssignor and related classes, update docs and move unit test

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
dengziming 2021-04-13 09:37:01 +08:00 committed by GitHub
parent c608d8480e
commit 88eb24db40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 159 additions and 465 deletions

View File

@ -17,13 +17,17 @@
package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
/**
* This interface is used to define custom partition assignment for use in
@ -254,4 +258,41 @@ public interface ConsumerPartitionAssignor {
}
}
/**
* Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
* based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
*/
static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
if (assignorClasses == null)
return assignors;
for (Object klass : assignorClasses) {
// first try to get the class if passed in as a string
if (klass instanceof String) {
try {
klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader());
} catch (ClassNotFoundException classNotFound) {
throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound);
}
}
if (klass instanceof Class<?>) {
Object assignor = Utils.newInstance((Class<?>) klass);
if (assignor instanceof Configurable)
((Configurable) assignor).configure(configs);
if (assignor instanceof ConsumerPartitionAssignor) {
assignors.add((ConsumerPartitionAssignor) assignor);
} else {
throw new KafkaException(klass + " is not an instance of " + ConsumerPartitionAssignor.class.getName());
}
} else {
throw new KafkaException("List contains element of type " + klass.getClass().getName() + ", expected String or Class");
}
}
return assignors;
}
}

View File

@ -81,8 +81,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
/**
* A client that consumes records from a Kafka cluster.
* <p>
@ -768,8 +766,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)));
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);
// no coordinator will be constructed for the default (null) group id
this.coordinator = !groupId.isPresent() ? null :

View File

@ -396,7 +396,7 @@ public abstract class AbstractCoordinator implements Closeable {
* If this function returns false, the state can be in one of the following:
* * UNJOINED: got error response but times out before being able to re-join, heartbeat disabled
* * PREPARING_REBALANCE: not yet received join-group response before timeout, heartbeat disabled
* * COMPLETING_REBALANCE: not yet received sync-group response before timeout, hearbeat enabled
* * COMPLETING_REBALANCE: not yet received sync-group response before timeout, heartbeat enabled
*
* Visible for testing.
*

View File

@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This interface is used to define custom partition assignment for use in
* {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
* to the topics they are interested in and forward their subscriptions to a Kafka broker serving
* as the group coordinator. The coordinator selects one member to perform the group assignment and
* propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called
* to perform the assignment and the results are forwarded back to each respective members
*
* In some cases, it is useful to forward additional metadata to the assignor in order to make
* assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
*
* This interface has been deprecated in 2.4, custom assignors should now implement
* {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}. Note that maintaining compatibility
* for an internal interface here is a special case, as {@code PartitionAssignor} was meant to be a public API
* although it was placed in the internals package. Users should not expect internal interfaces or classes to
* not be removed or maintain compatibility in any way.
*/
@Deprecated
public interface PartitionAssignor {
/**
* Return a serializable object representing the local member's subscription. This can include
* additional information as well (e.g. local host/rack information) which can be leveraged in
* {@link #assign(Cluster, Map)}.
* @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
* and variants
* @return Non-null subscription with optional user data
*/
Subscription subscription(Set<String> topics);
/**
* Perform the group assignment given the member subscriptions and current cluster metadata.
* @param metadata Current topic/broker metadata known by consumer
* @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)}
* @return A map from the members to their respective assignment. This should have one entry
* for all members who in the input subscription map.
*/
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
/**
* Callback which is invoked when a group member receives its assignment from the leader.
* @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)}
*/
void onAssignment(Assignment assignment);
/**
* Callback which is invoked when a group member receives its assignment from the leader.
* @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)}
* @param generation The consumer group generation associated with this partition assignment (optional)
*/
default void onAssignment(Assignment assignment, int generation) {
onAssignment(assignment);
}
/**
* Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
* @return non-null unique name
*/
String name();
class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
public Subscription(List<String> topics, ByteBuffer userData) {
this.topics = topics;
this.userData = userData;
}
public Subscription(List<String> topics) {
this(topics, ByteBuffer.wrap(new byte[0]));
}
public List<String> topics() {
return topics;
}
public ByteBuffer userData() {
return userData;
}
@Override
public String toString() {
return "Subscription(" +
"topics=" + topics +
')';
}
}
class Assignment {
private final List<TopicPartition> partitions;
private final ByteBuffer userData;
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions;
this.userData = userData;
}
public Assignment(List<TopicPartition> partitions) {
this(partitions, ByteBuffer.wrap(new byte[0]));
}
public List<TopicPartition> partitions() {
return partitions;
}
public ByteBuffer userData() {
return userData;
}
@Override
public String toString() {
return "Assignment(" +
"partitions=" + partitions +
')';
}
}
}

View File

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This adapter class is used to ensure backwards compatibility for those who have implemented the {@link PartitionAssignor}
* interface, which has been deprecated in favor of the new {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}.
* <p>
* Note that maintaining compatibility for an internal interface here is a special case, as {@code PartitionAssignor}
* was meant to be a public API although it was placed in the internals package. Users should not expect internal
* interfaces or classes to not be removed or maintain compatibility in any way.
*/
@SuppressWarnings("deprecation")
public class PartitionAssignorAdapter implements ConsumerPartitionAssignor {
private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignorAdapter.class);
private final PartitionAssignor oldAssignor;
PartitionAssignorAdapter(PartitionAssignor oldAssignor) {
this.oldAssignor = oldAssignor;
}
@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
return oldAssignor.subscription(topics).userData();
}
@Override
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
return toNewGroupAssignment(oldAssignor.assign(metadata, toOldGroupSubscription(groupSubscription)));
}
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
oldAssignor.onAssignment(toOldAssignment(assignment), metadata.generationId());
}
@Override
public String name() {
return oldAssignor.name();
}
private static PartitionAssignor.Assignment toOldAssignment(Assignment newAssignment) {
return new PartitionAssignor.Assignment(newAssignment.partitions(), newAssignment.userData());
}
private static Map<String, PartitionAssignor.Subscription> toOldGroupSubscription(GroupSubscription newSubscriptions) {
Map<String, PartitionAssignor.Subscription> oldSubscriptions = new HashMap<>();
for (Map.Entry<String, Subscription> entry : newSubscriptions.groupSubscription().entrySet()) {
String member = entry.getKey();
Subscription newSubscription = entry.getValue();
oldSubscriptions.put(member, new PartitionAssignor.Subscription(
newSubscription.topics(), newSubscription.userData()));
}
return oldSubscriptions;
}
private static GroupAssignment toNewGroupAssignment(Map<String, PartitionAssignor.Assignment> oldAssignments) {
Map<String, Assignment> newAssignments = new HashMap<>();
for (Map.Entry<String, PartitionAssignor.Assignment> entry : oldAssignments.entrySet()) {
String member = entry.getKey();
PartitionAssignor.Assignment oldAssignment = entry.getValue();
newAssignments.put(member, new Assignment(oldAssignment.partitions(), oldAssignment.userData()));
}
return new GroupAssignment(newAssignments);
}
/**
* Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
* based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
* where any instances of the old {@link PartitionAssignor} interface are wrapped in an adapter to the new
* {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} interface
*/
public static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
if (assignorClasses == null)
return assignors;
for (Object klass : assignorClasses) {
// first try to get the class if passed in as a string
if (klass instanceof String) {
try {
klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader());
} catch (ClassNotFoundException classNotFound) {
throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound);
}
}
if (klass instanceof Class<?>) {
Object assignor = Utils.newInstance((Class<?>) klass);
if (assignor instanceof Configurable)
((Configurable) assignor).configure(configs);
if (assignor instanceof ConsumerPartitionAssignor) {
assignors.add((ConsumerPartitionAssignor) assignor);
} else if (assignor instanceof PartitionAssignor) {
assignors.add(new PartitionAssignorAdapter((PartitionAssignor) assignor));
LOG.warn("The PartitionAssignor interface has been deprecated, "
+ "please implement the ConsumerPartitionAssignor interface instead.");
} else {
throw new KafkaException(klass + " is not an instance of " + PartitionAssignor.class.getName()
+ " or an instance of " + ConsumerPartitionAssignor.class.getName());
}
} else {
throw new KafkaException("List contains element of type " + klass.getClass().getName() + ", expected String or Class");
}
}
return assignors;
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerPartitionAssignorTest {
@Test
public void shouldInstantiateAssignor() {
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(
Collections.singletonList(StickyAssignor.class.getName()),
Collections.emptyMap()
);
assertTrue(assignors.get(0) instanceof StickyAssignor);
}
@Test
public void shouldInstantiateListOfAssignors() {
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(
Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName()),
Collections.emptyMap()
);
assertTrue(assignors.get(0) instanceof StickyAssignor);
assertTrue(assignors.get(1) instanceof CooperativeStickyAssignor);
}
@Test
public void shouldThrowKafkaExceptionOnNonAssignor() {
assertThrows(KafkaException.class, () -> getAssignorInstances(
Collections.singletonList(String.class.getName()),
Collections.emptyMap())
);
}
@Test
public void shouldThrowKafkaExceptionOnAssignorNotFound() {
assertThrows(KafkaException.class, () -> getAssignorInstances(
Collections.singletonList("Non-existent assignor"),
Collections.emptyMap())
);
}
@Test
public void shouldInstantiateFromClassType() {
List<String> classTypes =
initConsumerConfigWithClassTypes(Collections.singletonList(StickyAssignor.class))
.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classTypes, Collections.emptyMap());
assertTrue(assignors.get(0) instanceof StickyAssignor);
}
@Test
public void shouldInstantiateFromListOfClassTypes() {
List<String> classTypes = initConsumerConfigWithClassTypes(
Arrays.asList(StickyAssignor.class, CooperativeStickyAssignor.class)
).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classTypes, Collections.emptyMap());
assertTrue(assignors.get(0) instanceof StickyAssignor);
assertTrue(assignors.get(1) instanceof CooperativeStickyAssignor);
}
@Test
public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() {
List<String> classTypes =
initConsumerConfigWithClassTypes(Arrays.asList(StickyAssignor.class, String.class))
.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
assertThrows(KafkaException.class, () -> getAssignorInstances(classTypes, Collections.emptyMap()));
}
private ConsumerConfig initConsumerConfigWithClassTypes(List<Object> classTypes) {
Properties props = new Properties();
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
return new ConsumerConfig(props);
}
}

View File

@ -1,171 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
public class PartitionAssignorAdapterTest {
private List<String> classNames;
private List<Object> classTypes;
@Test
public void shouldInstantiateNewAssignors() {
classNames = Arrays.asList(StickyAssignor.class.getName());
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
assertTrue(StickyAssignor.class.isInstance(assignors.get(0)));
}
@Test
public void shouldAdaptOldAssignors() {
classNames = Arrays.asList(OldPartitionAssignor.class.getName());
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
assertTrue(PartitionAssignorAdapter.class.isInstance(assignors.get(0)));
}
@Test
public void shouldThrowKafkaExceptionOnNonAssignor() {
classNames = Arrays.asList(String.class.getName());
assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
}
@Test
public void shouldThrowKafkaExceptionOnAssignorNotFound() {
classNames = Arrays.asList("Non-existent assignor");
assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
}
@Test
public void shouldInstantiateFromListOfOldAndNewClassTypes() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
props, new StringDeserializer(), new StringDeserializer());
consumer.close();
}
@Test
public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class, String.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
assertThrows(KafkaException.class, () -> new KafkaConsumer<>(
props, new StringDeserializer(), new StringDeserializer()));
}
@Test
public void testOnAssignment() {
OldPartitionAssignor oldAssignor = new OldPartitionAssignor();
ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(oldAssignor);
TopicPartition tp1 = new TopicPartition("tp1", 1);
TopicPartition tp2 = new TopicPartition("tp2", 2);
List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
adaptedAssignor.onAssignment(new Assignment(partitions), new ConsumerGroupMetadata(""));
assertEquals(oldAssignor.partitions, partitions);
}
@Test
public void testAssign() {
ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(new OldPartitionAssignor());
Map<String, Subscription> subscriptions = new HashMap<>();
subscriptions.put("C1", new Subscription(Arrays.asList("topic1")));
subscriptions.put("C2", new Subscription(Arrays.asList("topic1", "topic2")));
subscriptions.put("C3", new Subscription(Arrays.asList("topic2", "topic3")));
GroupSubscription groupSubscription = new GroupSubscription(subscriptions);
Map<String, Assignment> assignments = adaptedAssignor.assign(null, groupSubscription).groupAssignment();
assertEquals(assignments.get("C1").partitions(), Arrays.asList(new TopicPartition("topic1", 1)));
assertEquals(assignments.get("C2").partitions(), Arrays.asList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
assertEquals(assignments.get("C3").partitions(), Arrays.asList(new TopicPartition("topic2", 1), new TopicPartition("topic3", 1)));
}
/*
* Dummy assignor just gives each consumer partition 1 of each topic it's subscribed to
*/
@SuppressWarnings("deprecation")
public static class OldPartitionAssignor implements PartitionAssignor {
List<TopicPartition> partitions = null;
@Override
public Subscription subscription(Set<String> topics) {
return new Subscription(new ArrayList<>(topics), null);
}
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
List<TopicPartition> partitions = new ArrayList<>();
for (String topic : entry.getValue().topics()) {
partitions.add(new TopicPartition(topic, 1));
}
assignments.put(entry.getKey(), new Assignment(partitions, null));
}
return assignments;
}
@Override
public void onAssignment(Assignment assignment) {
partitions = assignment.partitions();
}
@Override
public String name() {
return "old-assignor";
}
}
}

View File

@ -49,8 +49,10 @@
were removed. These methods were not intended to be public API and there is no replacement.</li>
<li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
instead.</li>
<li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
<li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
<li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
<li>The deprecated <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> instead.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>The deprecated <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).

View File

@ -190,7 +190,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
* later's cached metadata while sending subscriptions, and the latter needs former's returned assignment when
* latter's cached metadata while sending subscriptions, and the latter needs former's returned assignment when
* adding tasks.
*
* @throws KafkaException if the stream thread is not specified