KAFKA-19300 AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException (#19779)

I verified the behavior by rewriting the
`GroupAuthorizerIntegrationTest` in Java in this PR:
https://github.com/apache/kafka/pull/19685  The state is now correct.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-05-27 00:52:56 +08:00 committed by GitHub
parent 48a52701b9
commit 0600abdde3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 3 additions and 2 deletions

View File

@ -443,7 +443,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch);
notifyEpochChange(Optional.empty());
if (previousState == MemberState.UNSUBSCRIBED) {
if (previousState == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} got fatal error from the broker but it already " +
"left the group, so onPartitionsLost callback won't be triggered.", memberId, memberEpoch);
return;

View File

@ -34,7 +34,7 @@ import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@ -135,6 +135,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
@Timeout(60)
def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): Unit = {
val topic = "topic"