KAFKA-9525: add enforceRebalance method to Consumer API (#8087)

As described in KIP-568.

Waiting on acceptance of the KIP to write the tests, on the off chance something changes. But rest assured unit tests are coming ️

Will also kick off existing Streams system tests which leverage this new API (eg version probing, sometimes broker bounce)

Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2020-02-29 21:44:22 -05:00 committed by GitHub
parent 72a5aa8b07
commit a1f2ece323
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 138 additions and 40 deletions

View File

@ -248,6 +248,11 @@ public interface Consumer<K, V> extends Closeable {
*/ */
ConsumerGroupMetadata groupMetadata(); ConsumerGroupMetadata groupMetadata();
/**
* @see KafkaConsumer#enforceRebalance)
*/
void enforceRebalance();
/** /**
* @see KafkaConsumer#close() * @see KafkaConsumer#close()
*/ */

View File

@ -2247,6 +2247,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return coordinator.groupMetadata(); return coordinator.groupMetadata();
} }
/**
* Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces
* the consumer to trigger a new rebalance on the next {@link #poll(Duration)} call. Note that this API does not
* itself initiate the rebalance, so you must still call {@link #poll(Duration)}. If a rebalance is already in
* progress this call will be a no-op. If you wish to force an additional rebalance you must complete the current
* one by calling poll before retrying this API.
* <p>
* You do not need to call this during normal processing, as the consumer group will manage itself
* automatically and rebalance when necessary. However there may be situations where the application wishes to
* trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to
* the Consumer and its group changes in a way that would affect the userdata encoded in the
* {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer
* will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that
* the assignor can perform a partition reassignment based on the latest userdata. If your assignor does not use
* this userdata, or you do not use a custom
* {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not
* use this API.
*
* @throws java.lang.IllegalStateException if the consumer does not use group subscription
*/
@Override
public void enforceRebalance() {
acquireAndEnsureOpen();
try {
if (coordinator == null) {
throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
}
coordinator.requestRejoin();
} finally {
release();
}
}
/** /**
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
* If auto-commit is enabled, this will commit the current offsets if possible within the default * If auto-commit is enabled, this will commit the current offsets if possible within the default

View File

@ -566,6 +566,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
return null; return null;
} }
@Override
public void enforceRebalance() {
}
@Override @Override
public void close(Duration timeout) { public void close(Duration timeout) {
close(); close();

View File

@ -426,7 +426,7 @@ public abstract class AbstractCoordinator implements Closeable {
// Generation data maybe concurrently cleared by Heartbeat thread. // Generation data maybe concurrently cleared by Heartbeat thread.
// Can't use synchronized for {@code onJoinComplete}, because it can be long enough // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
// and shouldn't block hearbeat thread. // and shouldn't block heartbeat thread.
// See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
synchronized (AbstractCoordinator.this) { synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation; generationSnapshot = this.generation;
@ -904,7 +904,7 @@ public abstract class AbstractCoordinator implements Closeable {
resetGeneration(); resetGeneration();
} }
protected synchronized void requestRejoin() { public synchronized void requestRejoin() {
this.rejoinNeeded = true; this.rejoinNeeded = true;
} }

View File

@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.IsolationLevel;
@ -1722,7 +1723,7 @@ public class KafkaConsumerTest {
} }
@Test(expected = AuthenticationException.class) @Test(expected = AuthenticationException.class)
public void testCommittedAuthenticationFaiure() { public void testCommittedAuthenticationFailure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(); final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
consumer.committed(Collections.singleton(tp0)).get(tp0); consumer.committed(Collections.singleton(tp0)).get(tp0);
} }
@ -2418,4 +2419,44 @@ public class KafkaConsumerTest {
assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg")); assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
assertFalse(consumerMetricPresent(consumer, "time-between-poll-max")); assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
} }
@Test(expected = IllegalStateException.class)
public void testEnforceRebalanceWithManualAssignment() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.assign(singleton(new TopicPartition("topic", 0)));
consumer.enforceRebalance();
}
}
@Test
public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
Time time = new MockTime(1L);
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener);
Node node = metadata.fetch().nodes().get(0);
prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
// a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
consumer.poll(Duration.ZERO);
consumer.poll(Duration.ZERO);
// onPartitionsRevoked is not invoked when first joining the group
assertEquals(countingRebalanceListener.revokedCount, 0);
assertEquals(countingRebalanceListener.assignedCount, 1);
consumer.enforceRebalance();
// the next poll should trigger a rebalance
consumer.poll(Duration.ZERO);
assertEquals(countingRebalanceListener.revokedCount, 1);
}
} }

View File

@ -22,7 +22,6 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@ -2902,31 +2901,4 @@ public class ConsumerCoordinatorTest {
this.exception = exception; this.exception = exception;
} }
} }
private static class MockRebalanceListener implements ConsumerRebalanceListener {
public Collection<TopicPartition> lost;
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int lostCount = 0;
public int revokedCount = 0;
public int assignedCount = 0;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
this.assigned = partitions;
assignedCount++;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
this.revoked = partitions;
revokedCount++;
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
this.lost = partitions;
lostCount++;
}
}
} }

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
public class MockRebalanceListener implements ConsumerRebalanceListener {
public Collection<TopicPartition> lost;
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int lostCount = 0;
public int revokedCount = 0;
public int assignedCount = 0;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
this.assigned = partitions;
assignedCount++;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
this.revoked = partitions;
revokedCount++;
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
this.lost = partitions;
lostCount++;
}
}

View File

@ -753,7 +753,7 @@ public class StreamThread extends Thread {
log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance."); log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
assignmentErrorCode.set(AssignorError.NONE.code()); assignmentErrorCode.set(AssignorError.NONE.code());
enforceRebalance(); mainConsumer.enforceRebalance();
} }
} catch (final TaskCorruptedException e) { } catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks {} are corrupted. " + log.warn("Detected the states of tasks {} are corrupted. " +
@ -766,16 +766,11 @@ public class StreamThread extends Thread {
"Will close out all assigned tasks and rejoin the consumer group."); "Will close out all assigned tasks and rejoin the consumer group.");
taskManager.handleLostAll(); taskManager.handleLostAll();
enforceRebalance(); mainConsumer.enforceRebalance();
} }
} }
} }
private void enforceRebalance() {
mainConsumer.unsubscribe();
subscribeConsumer();
}
private void subscribeConsumer() { private void subscribeConsumer() {
if (builder.usesPatternSubscription()) { if (builder.usesPatternSubscription()) {
mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);

View File

@ -539,7 +539,7 @@ class StreamsUpgradeTest(Test):
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 7' on" + str(second_other_node.account)) err_msg="Never saw output 'Upgrade metadata to version 7' on" + str(second_other_node.account))
log_monitor.wait_until("Version probing detected. Triggering new rebalance.", log_monitor.wait_until("Version probing detected. Rejoining the consumer group to trigger a new rebalance.",
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))