KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)

This fixes librdkafka older than the recently released 2.11.0 with
Kerberos authentication and Apache Kafka 4.x.

Even though this is a bug in librdkafka, a key goal of KIP-896 is not to
break the popular client libraries listed in it. Adding back JoinGroup
v0 & v1 is a very small change and worth it from that perspective.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ismael Juma 2025-07-07 08:44:24 -07:00 committed by GitHub
parent e8ee7fc210
commit 4b607616c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 28 additions and 6 deletions

View File

@ -18,8 +18,6 @@
"type": "request", "type": "request",
"listeners": ["broker"], "listeners": ["broker"],
"name": "JoinGroupRequest", "name": "JoinGroupRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//
// Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as version 1. // Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as version 1.
// //
// Starting from version 4, the client needs to issue a second request to join group // Starting from version 4, the client needs to issue a second request to join group
@ -34,7 +32,7 @@
// Version 8 adds the Reason field (KIP-800). // Version 8 adds the Reason field (KIP-800).
// //
// Version 9 is the same as version 8. // Version 9 is the same as version 8.
"validVersions": "2-9", "validVersions": "0-9",
"flexibleVersions": "6+", "flexibleVersions": "6+",
"fields": [ "fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

View File

@ -17,8 +17,6 @@
"apiKey": 11, "apiKey": 11,
"type": "response", "type": "response",
"name": "JoinGroupResponse", "name": "JoinGroupResponse",
// Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new baseline.
//
// Version 1 is the same as version 0. // Version 1 is the same as version 0.
// //
// Version 2 adds throttle time. // Version 2 adds throttle time.
@ -37,7 +35,7 @@
// Version 8 is the same as version 7. // Version 8 is the same as version 7.
// //
// Version 9 adds the SkipAssignment field. // Version 9 adds the SkipAssignment field.
"validVersions": "2-9", "validVersions": "0-9",
"flexibleVersions": "6+", "flexibleVersions": "6+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,

View File

@ -19,12 +19,14 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
public class JoinGroupRequestTest { public class JoinGroupRequestTest {
@ -61,4 +63,20 @@ public class JoinGroupRequestTest {
.setProtocolType("consumer") .setProtocolType("consumer")
).build((short) 4)); ).build((short) 4));
} }
@Test
public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() {
int sessionTimeoutMs = 30000;
short version = 0;
var buffer = MessageUtil.toByteBufferAccessor(new JoinGroupRequestData()
.setGroupId("groupId")
.setMemberId("consumerId")
.setProtocolType("consumer")
.setSessionTimeoutMs(sessionTimeoutMs), version);
JoinGroupRequest request = JoinGroupRequest.parse(buffer, version);
assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs());
assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs());
}
} }

View File

@ -728,6 +728,14 @@ public class RequestResponseTest {
assertEquals(request.isolationLevel(), deserialized.isolationLevel()); assertEquals(request.isolationLevel(), deserialized.isolationLevel());
} }
@Test
public void testJoinGroupRequestV0RebalanceTimeout() {
final short version = 0;
JoinGroupRequest jgr = createJoinGroupRequest(version);
JoinGroupRequest jgr2 = JoinGroupRequest.parse(jgr.serialize(), version);
assertEquals(jgr2.data().rebalanceTimeoutMs(), jgr.data().rebalanceTimeoutMs());
}
@Test @Test
public void testSerializeWithHeader() { public void testSerializeWithHeader() {
CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1); CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1);