mirror of https://github.com/apache/kafka.git
MINOR: Remove `SubscribedTopicRegex` field from `ConsumerGroupHeartbeatRequest` (#14956)
The support for regular expressions has not been implemented yet in the new consumer group protocol. This patch removes the `SubscribedTopicRegex` from the `ConsumerGroupHeartbeatRequest` in preparation for 3.7. It seems better to bump the version and add it back when we implement the feature, as part of https://issues.apache.org/jira/browse/KAFKA-14517, instead of having an unused field in the request. Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
32fdb8d173
commit
131581a2b4
|
@ -35,8 +35,6 @@
|
||||||
"about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
|
"about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
|
||||||
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
|
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
|
||||||
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
|
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
|
||||||
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
|
|
||||||
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
|
|
||||||
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
|
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
|
||||||
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
|
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
|
||||||
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
|
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
|
||||||
|
|
|
@ -295,8 +295,6 @@ public class HeartbeatRequestManagerTest {
|
||||||
assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames());
|
assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames());
|
||||||
assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId());
|
assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId());
|
||||||
assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor());
|
assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor());
|
||||||
// TODO: Test pattern subscription.
|
|
||||||
assertNull(heartbeatRequest.data().subscribedTopicRegex());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -453,7 +451,6 @@ public class HeartbeatRequestManagerTest {
|
||||||
assertNull(data.instanceId());
|
assertNull(data.instanceId());
|
||||||
assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
|
assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
|
||||||
assertEquals(Collections.emptyList(), data.subscribedTopicNames());
|
assertEquals(Collections.emptyList(), data.subscribedTopicNames());
|
||||||
assertNull(data.subscribedTopicRegex());
|
|
||||||
assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
|
assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
|
||||||
assertEquals(Collections.emptyList(), data.topicPartitions());
|
assertEquals(Collections.emptyList(), data.topicPartitions());
|
||||||
membershipManager.onHeartbeatRequestSent();
|
membershipManager.onHeartbeatRequestSent();
|
||||||
|
@ -468,7 +465,6 @@ public class HeartbeatRequestManagerTest {
|
||||||
assertNull(data.instanceId());
|
assertNull(data.instanceId());
|
||||||
assertEquals(-1, data.rebalanceTimeoutMs());
|
assertEquals(-1, data.rebalanceTimeoutMs());
|
||||||
assertNull(data.subscribedTopicNames());
|
assertNull(data.subscribedTopicNames());
|
||||||
assertNull(data.subscribedTopicRegex());
|
|
||||||
assertNull(data.serverAssignor());
|
assertNull(data.serverAssignor());
|
||||||
assertNull(data.topicPartitions());
|
assertNull(data.topicPartitions());
|
||||||
membershipManager.onHeartbeatRequestSent();
|
membershipManager.onHeartbeatRequestSent();
|
||||||
|
@ -486,7 +482,6 @@ public class HeartbeatRequestManagerTest {
|
||||||
assertNull(data.instanceId());
|
assertNull(data.instanceId());
|
||||||
assertEquals(-1, data.rebalanceTimeoutMs());
|
assertEquals(-1, data.rebalanceTimeoutMs());
|
||||||
assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());
|
assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());
|
||||||
assertNull(data.subscribedTopicRegex());
|
|
||||||
assertNull(data.serverAssignor());
|
assertNull(data.serverAssignor());
|
||||||
assertNull(data.topicPartitions());
|
assertNull(data.topicPartitions());
|
||||||
membershipManager.onHeartbeatRequestSent();
|
membershipManager.onHeartbeatRequestSent();
|
||||||
|
|
|
@ -461,7 +461,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
||||||
rebalanceTimeoutMs: Int = -1,
|
rebalanceTimeoutMs: Int = -1,
|
||||||
serverAssignor: String = null,
|
serverAssignor: String = null,
|
||||||
subscribedTopicNames: List[String] = null,
|
subscribedTopicNames: List[String] = null,
|
||||||
subscribedTopicRegex: String = null,
|
|
||||||
topicPartitions: List[ConsumerGroupHeartbeatRequestData.TopicPartitions] = null
|
topicPartitions: List[ConsumerGroupHeartbeatRequestData.TopicPartitions] = null
|
||||||
): ConsumerGroupHeartbeatResponseData = {
|
): ConsumerGroupHeartbeatResponseData = {
|
||||||
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
|
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
|
||||||
|
@ -473,7 +472,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
||||||
.setRackId(rackId)
|
.setRackId(rackId)
|
||||||
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
|
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
|
||||||
.setSubscribedTopicNames(subscribedTopicNames.asJava)
|
.setSubscribedTopicNames(subscribedTopicNames.asJava)
|
||||||
.setSubscribedTopicRegex(subscribedTopicRegex)
|
|
||||||
.setServerAssignor(serverAssignor)
|
.setServerAssignor(serverAssignor)
|
||||||
.setTopicPartitions(topicPartitions.asJava),
|
.setTopicPartitions(topicPartitions.asJava),
|
||||||
true
|
true
|
||||||
|
|
|
@ -749,7 +749,6 @@ public class GroupMetadataManager {
|
||||||
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
|
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
|
||||||
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
|
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
|
||||||
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
|
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
|
||||||
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
|
|
||||||
|
|
||||||
if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
|
if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
|
||||||
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
|
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
|
||||||
|
@ -945,8 +944,6 @@ public class GroupMetadataManager {
|
||||||
* @param clientHost The client host.
|
* @param clientHost The client host.
|
||||||
* @param subscribedTopicNames The list of subscribed topic names from the request
|
* @param subscribedTopicNames The list of subscribed topic names from the request
|
||||||
* of null.
|
* of null.
|
||||||
* @param subscribedTopicRegex The regular expression based subscription from the
|
|
||||||
* request or null.
|
|
||||||
* @param assignorName The assignor name from the request or null.
|
* @param assignorName The assignor name from the request or null.
|
||||||
* @param ownedTopicPartitions The list of owned partitions from the request or null.
|
* @param ownedTopicPartitions The list of owned partitions from the request or null.
|
||||||
*
|
*
|
||||||
|
@ -963,7 +960,6 @@ public class GroupMetadataManager {
|
||||||
String clientId,
|
String clientId,
|
||||||
String clientHost,
|
String clientHost,
|
||||||
List<String> subscribedTopicNames,
|
List<String> subscribedTopicNames,
|
||||||
String subscribedTopicRegex,
|
|
||||||
String assignorName,
|
String assignorName,
|
||||||
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
|
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
|
||||||
) throws ApiException {
|
) throws ApiException {
|
||||||
|
@ -1030,7 +1026,6 @@ public class GroupMetadataManager {
|
||||||
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
|
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
|
||||||
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
|
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
|
||||||
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
|
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
|
||||||
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
|
|
||||||
.setClientId(clientId)
|
.setClientId(clientId)
|
||||||
.setClientHost(clientHost)
|
.setClientHost(clientHost)
|
||||||
.build();
|
.build();
|
||||||
|
@ -1448,7 +1443,6 @@ public class GroupMetadataManager {
|
||||||
context.clientId(),
|
context.clientId(),
|
||||||
context.clientAddress.toString(),
|
context.clientAddress.toString(),
|
||||||
request.subscribedTopicNames(),
|
request.subscribedTopicNames(),
|
||||||
request.subscribedTopicRegex(),
|
|
||||||
request.serverAssignor(),
|
request.serverAssignor(),
|
||||||
request.topicPartitions()
|
request.topicPartitions()
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue