KAFKA-13204: assignor name conflict check (#11217)

Add the partition assignor name conflicting check to avoid the wrong assignor being used. 

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Luke Chen 2021-08-19 08:23:29 +08:00 committed by GitHub
parent fb2b131310
commit 8bc4c334e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 0 deletions

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Optional; import java.util.Optional;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -264,6 +265,8 @@ public interface ConsumerPartitionAssignor {
*/ */
static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) { static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>(); List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
// a map to store assignor name -> assignor class name
Map<String, String> assignorNameMap = new HashMap<>();
if (assignorClasses == null) if (assignorClasses == null)
return assignors; return assignors;
@ -284,6 +287,12 @@ public interface ConsumerPartitionAssignor {
((Configurable) assignor).configure(configs); ((Configurable) assignor).configure(configs);
if (assignor instanceof ConsumerPartitionAssignor) { if (assignor instanceof ConsumerPartitionAssignor) {
String assignorName = ((ConsumerPartitionAssignor) assignor).name();
if (assignorNameMap.containsKey(assignorName)) {
throw new KafkaException("The assignor name: '" + assignorName + "' is used in more than one assignor: " +
assignorNameMap.get(assignorName) + ", " + assignor.getClass().getName());
}
assignorNameMap.put(assignorName, assignor.getClass().getName());
assignors.add((ConsumerPartitionAssignor) assignor); assignors.add((ConsumerPartitionAssignor) assignor);
} else { } else {
throw new KafkaException(klass + " is not an instance of " + ConsumerPartitionAssignor.class.getName()); throw new KafkaException(klass + " is not an instance of " + ConsumerPartitionAssignor.class.getName());

View File

@ -17,14 +17,17 @@
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@ -99,6 +102,48 @@ public class ConsumerPartitionAssignorTest {
assertThrows(KafkaException.class, () -> getAssignorInstances(classTypes, Collections.emptyMap())); assertThrows(KafkaException.class, () -> getAssignorInstances(classTypes, Collections.emptyMap()));
} }
@Test
public void shouldThrowKafkaExceptionOnAssignorsWithSameName() {
assertThrows(KafkaException.class, () -> getAssignorInstances(
Arrays.asList(RangeAssignor.class.getName(), TestConsumerPartitionAssignor.class.getName()),
Collections.emptyMap()
));
}
public static class TestConsumerPartitionAssignor implements ConsumerPartitionAssignor {
@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
return ConsumerPartitionAssignor.super.subscriptionUserData(topics);
}
@Override
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
return null;
}
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
ConsumerPartitionAssignor.super.onAssignment(assignment, metadata);
}
@Override
public List<RebalanceProtocol> supportedProtocols() {
return ConsumerPartitionAssignor.super.supportedProtocols();
}
@Override
public short version() {
return ConsumerPartitionAssignor.super.version();
}
@Override
public String name() {
// use the RangeAssignor's name to cause naming conflict
return new RangeAssignor().name();
}
}
private ConsumerConfig initConsumerConfigWithClassTypes(List<Object> classTypes) { private ConsumerConfig initConsumerConfigWithClassTypes(List<Object> classTypes) {
Properties props = new Properties(); Properties props = new Properties();
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

View File

@ -2837,6 +2837,17 @@ public class KafkaConsumerTest {
} }
} }
@Test
public void testAssignorNameConflict() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Arrays.asList(RangeAssignor.class.getName(), ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
assertThrows(KafkaException.class,
() -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
}
private static final List<String> CLIENT_IDS = new ArrayList<>(); private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> { public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override @Override