From 131581a2b41b7a8a2a8e261aa0d7b2f84334ca02 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 11 Dec 2023 08:53:08 +0100 Subject: [PATCH] MINOR: Remove `SubscribedTopicRegex` field from `ConsumerGroupHeartbeatRequest` (#14956) The support for regular expressions has not been implemented yet in the new consumer group protocol. This patch removes the `SubscribedTopicRegex` from the `ConsumerGroupHeartbeatRequest` in preparation for 3.7. It seems better to bump the version and add it back when we implement the feature, as part of https://issues.apache.org/jira/browse/KAFKA-14517, instead of having an unused field in the request. Reviewers: Sagar Rao , Justine Olshan --- .../common/message/ConsumerGroupHeartbeatRequest.json | 2 -- .../consumer/internals/HeartbeatRequestManagerTest.java | 5 ----- .../unit/kafka/server/GroupCoordinatorBaseRequestTest.scala | 2 -- .../kafka/coordinator/group/GroupMetadataManager.java | 6 ------ 4 files changed, 15 deletions(-) diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index 95c26421ce2..71c6e2e2502 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -35,8 +35,6 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, - { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index c74388bc671..53ee074e370 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -295,8 +295,6 @@ public class HeartbeatRequestManagerTest { assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames()); assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId()); assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor()); - // TODO: Test pattern subscription. - assertNull(heartbeatRequest.data().subscribedTopicRegex()); } @Test @@ -453,7 +451,6 @@ public class HeartbeatRequestManagerTest { assertNull(data.instanceId()); assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); - assertNull(data.subscribedTopicRegex()); assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -468,7 +465,6 @@ public class HeartbeatRequestManagerTest { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); - assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -486,7 +482,6 @@ public class HeartbeatRequestManagerTest { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); - assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 96cd3fd31fa..3b6d712f1ba 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -461,7 +461,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { rebalanceTimeoutMs: Int = -1, serverAssignor: String = null, subscribedTopicNames: List[String] = null, - subscribedTopicRegex: String = null, topicPartitions: List[ConsumerGroupHeartbeatRequestData.TopicPartitions] = null ): ConsumerGroupHeartbeatResponseData = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( @@ -473,7 +472,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { .setRackId(rackId) .setRebalanceTimeoutMs(rebalanceTimeoutMs) .setSubscribedTopicNames(subscribedTopicNames.asJava) - .setSubscribedTopicRegex(subscribedTopicRegex) .setServerAssignor(serverAssignor) .setTopicPartitions(topicPartitions.asJava), true diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 3f47b49240f..c394680ac80 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -749,7 +749,6 @@ public class GroupMetadataManager { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); throwIfEmptyString(request.instanceId(), "InstanceId can't be empty."); throwIfEmptyString(request.rackId(), "RackId can't be empty."); - throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet."); if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) { throwIfEmptyString(request.memberId(), "MemberId can't be empty."); @@ -945,8 +944,6 @@ public class GroupMetadataManager { * @param clientHost The client host. * @param subscribedTopicNames The list of subscribed topic names from the request * of null. - * @param subscribedTopicRegex The regular expression based subscription from the - * request or null. * @param assignorName The assignor name from the request or null. * @param ownedTopicPartitions The list of owned partitions from the request or null. * @@ -963,7 +960,6 @@ public class GroupMetadataManager { String clientId, String clientHost, List subscribedTopicNames, - String subscribedTopicRegex, String assignorName, List ownedTopicPartitions ) throws ApiException { @@ -1030,7 +1026,6 @@ public class GroupMetadataManager { .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs)) .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName)) .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) - .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)) .setClientId(clientId) .setClientHost(clientHost) .build(); @@ -1448,7 +1443,6 @@ public class GroupMetadataManager { context.clientId(), context.clientAddress.toString(), request.subscribedTopicNames(), - request.subscribedTopicRegex(), request.serverAssignor(), request.topicPartitions() );