KAFKA-15561 [3/N]: Client support for SubscriptionPattern in HB (#17951)

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Lianet Magrans 2024-11-27 12:01:12 -05:00 committed by GitHub
parent c32a49549d
commit 37b4d9b01d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 110 additions and 6 deletions

View File

@ -406,6 +406,12 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
heartbeatRequestState.reset();
break;
case INVALID_REGULAR_EXPRESSION:
logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception("Invalid RE2J SubscriptionPattern provided in the call to " +
"subscribe. " + errorMessage));
break;
default:
if (!handleSpecificError(response, currentTimeMs)) {
// If the manager receives an unknown error - there could be a bug in the code or a new error code

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
@ -32,6 +33,7 @@ import org.apache.kafka.common.utils.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
@ -231,6 +233,15 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
sentFields.subscribedTopicNames = subscribedTopicNames;
}
// SubscribedTopicRegex - only sent if it has changed since the last heartbeat.
// Send empty string to indicate that a subscribed pattern needs to be removed.
SubscriptionPattern pattern = subscriptions.subscriptionPattern();
boolean patternUpdated = !Objects.equals(pattern, sentFields.pattern);
if ((sendAllFields && pattern != null) || patternUpdated) {
data.setSubscribedTopicRegex((pattern != null) ? pattern.pattern() : "");
sentFields.pattern = pattern;
}
// ServerAssignor - sent when joining or if it has changed since the last heartbeat
this.membershipManager.serverAssignor().ifPresent(serverAssignor -> {
if (sendAllFields || !serverAssignor.equals(sentFields.serverAssignor)) {
@ -239,8 +250,6 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
}
});
// ClientAssignors - not supported yet
// TopicPartitions - sent when joining or with the first heartbeat after a new assignment from
// the server was reconciled. This is ensured by resending the topic partitions whenever the
// local assignment, including its local epoch is changed (although the local epoch is not sent
@ -268,6 +277,7 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private SubscriptionPattern pattern = null;
private String serverAssignor = null;
private AbstractMembershipManager.LocalAssignment localAssignment = null;
@ -278,6 +288,7 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
rebalanceTimeoutMs = -1;
serverAssignor = null;
localAssignment = null;
pattern = null;
}
}
}

View File

@ -361,6 +361,24 @@ public class SubscriptionState {
return Collections.emptySet();
}
/**
* @return The RE2J compatible pattern in use, provided via a call to
* {@link #subscribe(SubscriptionPattern, Optional)}.
* Null if there is no SubscriptionPattern in use.
*/
public synchronized SubscriptionPattern subscriptionPattern() {
if (hasRe2JPatternSubscription())
return this.subscribedRe2JPattern;
return null;
}
/**
* @return True if subscribed using RE2J pattern. False otherwise.
*/
public synchronized boolean hasRe2JPatternSubscription() {
return this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
public synchronized Set<TopicPartition> pausedPartitions() {
return collectPartitions(TopicPartitionState::isPaused);
}
@ -469,7 +487,7 @@ public class SubscriptionState {
public synchronized boolean hasAutoAssignedPartitions() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE;
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE || this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
public synchronized void position(TopicPartition tp, FetchPosition position) {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.HeartbeatRequestState;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager.HeartbeatState;
@ -864,6 +865,61 @@ public class ConsumerHeartbeatRequestManagerTest {
"No requests should be generated on close if the member is not leaving when closing the manager");
}
@Test
public void testRegexInHeartbeatLifecycle() {
heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
createHeartbeatRequestStateWithZeroHeartbeatInterval();
// Initial heartbeat with regex
mockJoiningMemberData(null);
when(subscriptions.subscriptionPattern()).thenReturn(new SubscriptionPattern("t1.*"));
ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
assertEquals("t1.*", data.subscribedTopicRegex());
// Regex not included in HB if not updated
when(membershipManager.state()).thenReturn(MemberState.STABLE);
data = heartbeatState.buildRequestData();
assertNull(data.subscribedTopicRegex());
// Regex included in HB if updated
when(subscriptions.subscriptionPattern()).thenReturn(new SubscriptionPattern("t2.*"));
data = heartbeatState.buildRequestData();
assertEquals("t2.*", data.subscribedTopicRegex());
// Empty regex included in HB to remove pattern subscription
when(subscriptions.subscriptionPattern()).thenReturn(null);
data = heartbeatState.buildRequestData();
assertEquals("", data.subscribedTopicRegex());
// Regex not included in HB after pattern subscription removed
when(subscriptions.subscriptionPattern()).thenReturn(null);
data = heartbeatState.buildRequestData();
assertNull(data.subscribedTopicRegex());
}
@Test
public void testRegexInJoiningHeartbeat() {
heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
createHeartbeatRequestStateWithZeroHeartbeatInterval();
// Initial heartbeat with regex
mockJoiningMemberData(null);
when(subscriptions.subscriptionPattern()).thenReturn(new SubscriptionPattern("t1.*"));
ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
assertEquals("t1.*", data.subscribedTopicRegex());
// Members unsubscribes from regex (empty regex included in HB)
when(subscriptions.subscriptionPattern()).thenReturn(null);
data = heartbeatState.buildRequestData();
assertEquals("", data.subscribedTopicRegex());
// Member rejoins (ie. fenced) should not include regex field in HB
when(membershipManager.state()).thenReturn(MemberState.JOINING);
when(subscriptions.subscriptionPattern()).thenReturn(null);
data = heartbeatState.buildRequestData();
assertNull(data.subscribedTopicRegex());
}
private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs) {
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());

View File

@ -400,7 +400,7 @@ public class SubscriptionStateTest {
@Test
public void testSubscribeToRe2JPattern() {
String pattern = "t*";
String pattern = "t.*";
state.subscribe(new SubscriptionPattern(pattern), Optional.of(rebalanceListener));
assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
assertTrue(state.toString().contains("subscribedPattern=" + pattern));
@ -409,15 +409,28 @@ public class SubscriptionStateTest {
@Test
public void testMixedPatternSubscriptionNotAllowed() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(new SubscriptionPattern("t*"),
assertThrows(IllegalStateException.class, () -> state.subscribe(new SubscriptionPattern("t.*"),
Optional.of(rebalanceListener)));
state.unsubscribe();
state.subscribe(new SubscriptionPattern("t*"), Optional.of(rebalanceListener));
state.subscribe(new SubscriptionPattern("t.*"), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () -> state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
@Test
public void testSubscriptionPattern() {
SubscriptionPattern pattern = new SubscriptionPattern("t.*");
state.subscribe(pattern, Optional.of(rebalanceListener));
assertTrue(state.hasRe2JPatternSubscription());
assertEquals(pattern, state.subscriptionPattern());
assertTrue(state.hasAutoAssignedPartitions());
state.unsubscribe();
assertFalse(state.hasRe2JPatternSubscription());
assertNull(state.subscriptionPattern());
}
@Test
public void unsubscribeUserAssignment() {