KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor (#11439)

This fix is trying to skip the assignment validation for built-in cooperative sticky assignor, since (a) we know the assignment is valid since we do essentially this same check already in the cooperative sticky assignor, and (b) the check is broken anyways due to potential for claimed `ownedPartitions` to be incorrect

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Luke Chen 2021-11-16 10:57:03 +08:00 committed by Jason Gustafson
parent 33c313d853
commit dc209f102a
3 changed files with 121 additions and 5 deletions

View File

@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.types.Type;
* cooperative rebalancing. See the <a href="https://kafka.apache.org/documentation/#upgrade_240_notable">upgrade guide</a> for details.
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
public static final String COOPERATIVE_STICKY_ASSIGNOR_NAME = "cooperative-sticky";
// these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
private static final String GENERATION_KEY_NAME = "generation";
@ -59,7 +60,7 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor {
@Override
public String name() {
return "cooperative-sticky";
return COOPERATIVE_STICKY_ASSIGNOR_NAME;
}
@Override

View File

@ -36,11 +36,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
@ -81,6 +81,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
/**
* This class manages the coordination process with the consumer coordinator.
*/
@ -548,6 +550,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
String assignorName = assignor.name();
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
@ -569,11 +572,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
isLeader = true;
log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
log.debug("Performing assignment using strategy {} with subscriptions {}", assignorName, subscriptions);
Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
if (protocol == RebalanceProtocol.COOPERATIVE) {
// skip the validation for built-in cooperative sticky assignor since we've considered
// the "generation" of ownedPartition inside the assignor
if (protocol == RebalanceProtocol.COOPERATIVE && !assignorName.equals(COOPERATIVE_STICKY_ASSIGNOR_NAME)) {
validateCooperativeAssignment(ownedPartitions, assignments);
}

View File

@ -55,6 +55,10 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
@ -79,6 +83,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -102,11 +107,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -256,6 +263,101 @@ public class ConsumerCoordinatorTest {
return metrics.sensor(name);
}
public ByteBuffer subscriptionUserData(int generation) {
final String generationKeyName = "generation";
final Schema cooperativeStickyAssignorUserDataV0 = new Schema(
new Field(generationKeyName, Type.INT32));
Struct struct = new Struct(cooperativeStickyAssignorUserDataV0);
struct.set(generationKeyName, generation);
ByteBuffer buffer = ByteBuffer.allocate(cooperativeStickyAssignorUserDataV0.sizeOf(struct));
cooperativeStickyAssignorUserDataV0.write(buffer, struct);
buffer.flip();
return buffer;
}
private List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup() {
// consumer1 and consumer2 subscribed to "topic1" with 2 partitions: t1p, t2p
Map<String, List<String>> memberSubscriptions = new HashMap<>();
List<String> subscribedTopics = singletonList(topic1);
memberSubscriptions.put("consumer-1", subscribedTopics);
memberSubscriptions.put("consumer-2", subscribedTopics);
// the ownedPartition for consumer1 is t1p, t2p
ConsumerPartitionAssignor.Subscription subscriptionConsumer1 = new ConsumerPartitionAssignor.Subscription(
subscribedTopics, subscriptionUserData(1), Arrays.asList(t1p, t2p));
// the ownedPartition for consumer2 is empty
ConsumerPartitionAssignor.Subscription subscriptionConsumer2 = new ConsumerPartitionAssignor.Subscription(
subscribedTopics, subscriptionUserData(1), emptyList());
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
ByteBuffer buf = null;
if (subscriptionEntry.getKey().equals("consumer-1")) {
buf = ConsumerProtocol.serializeSubscription(subscriptionConsumer1);
} else {
buf = ConsumerProtocol.serializeSubscription(subscriptionConsumer2);
}
metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId(subscriptionEntry.getKey())
.setMetadata(buf.array()));
}
return metadata;
}
@Test
public void testPerformAssignmentShouldValidateCooperativeAssignment() {
SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = validateCooperativeAssignmentTestSetup();
// simulate the custom cooperative assignor didn't revoke the partition first before assign to other consumer
Map<String, List<TopicPartition>> assignment = new HashMap<>();
assignment.put("consumer-1", Arrays.asList(t1p));
assignment.put("consumer-2", Arrays.asList(t2p));
partitionAssignor.prepare(assignment);
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, mockSubscriptionState)) {
if (protocol == COOPERATIVE) {
// in cooperative protocol, we should throw exception when validating cooperative assignment
Exception e = assertThrows(IllegalStateException.class,
() -> coordinator.performAssignment("1", partitionAssignor.name(), metadata));
assertTrue(e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
} else {
// in eager protocol, we should not validate assignment
coordinator.performAssignment("1", partitionAssignor.name(), metadata);
}
}
}
@Test
public void testPerformAssignmentShouldSkipValidateCooperativeAssignmentForBuiltInCooperativeStickyAssignor() {
SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = validateCooperativeAssignmentTestSetup();
List<ConsumerPartitionAssignor> assignorsWithCooperativeStickyAssignor = new ArrayList<>(assignors);
// create a mockPartitionAssignor with the same name as cooperative sticky assignor
MockPartitionAssignor mockCooperativeStickyAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)) {
@Override
public String name() {
return COOPERATIVE_STICKY_ASSIGNOR_NAME;
}
};
assignorsWithCooperativeStickyAssignor.add(mockCooperativeStickyAssignor);
// simulate the cooperative sticky assignor do the assignment with out-of-date ownedPartition
Map<String, List<TopicPartition>> assignment = new HashMap<>();
assignment.put("consumer-1", Arrays.asList(t1p));
assignment.put("consumer-2", Arrays.asList(t2p));
mockCooperativeStickyAssignor.prepare(assignment);
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignorsWithCooperativeStickyAssignor, false, mockSubscriptionState)) {
// should not validate assignment for built-in cooperative sticky assignor
coordinator.performAssignment("1", mockCooperativeStickyAssignor.name(), metadata);
}
}
@Test
public void testSelectRebalanceProtcol() {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
@ -2676,7 +2778,8 @@ public class ConsumerCoordinatorTest {
private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
final Metrics metrics,
final List<ConsumerPartitionAssignor> assignors,
final boolean autoCommitEnabled) {
final boolean autoCommitEnabled,
final SubscriptionState subscriptions) {
return new ConsumerCoordinator(
rebalanceConfig,
new LogContext(),
@ -2692,6 +2795,13 @@ public class ConsumerCoordinatorTest {
null);
}
private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
final Metrics metrics,
final List<ConsumerPartitionAssignor> assignors,
final boolean autoCommitEnabled) {
return buildCoordinator(rebalanceConfig, metrics, assignors, autoCommitEnabled, subscriptions);
}
private Collection<TopicPartition> getRevoked(final List<TopicPartition> owned,
final List<TopicPartition> assigned) {
switch (protocol) {