mirror of https://github.com/apache/kafka.git
KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions (#14635)
This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.
Reviewers: David Jacot <djacot@confluent.io>
(cherry picked from commit 3fd6293449)
This commit is contained in:
parent
bc435f07e3
commit
a618f2095f
|
|
@ -61,15 +61,15 @@ public class LeaveGroupResponse extends AbstractResponse {
|
|||
|
||||
if (version >= 3) {
|
||||
this.data = data;
|
||||
} else if (data.errorCode() != Errors.NONE.code()) {
|
||||
this.data = new LeaveGroupResponseData().setErrorCode(data.errorCode());
|
||||
} else {
|
||||
if (data.members().size() != 1) {
|
||||
throw new UnsupportedVersionException("LeaveGroup response version " + version +
|
||||
" can only contain one member, got " + data.members().size() + " members.");
|
||||
}
|
||||
|
||||
Errors topLevelError = Errors.forCode(data.errorCode());
|
||||
short errorCode = getError(topLevelError, data.members()).code();
|
||||
this.data = new LeaveGroupResponseData().setErrorCode(errorCode);
|
||||
this.data = new LeaveGroupResponseData().setErrorCode(data.members().get(0).errorCode());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,13 +16,16 @@
|
|||
*/
|
||||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseData;
|
||||
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.protocol.MessageUtil;
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
|
@ -34,6 +37,7 @@ import java.util.Map;
|
|||
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class LeaveGroupResponseTest {
|
||||
|
|
@ -165,4 +169,57 @@ public class LeaveGroupResponseTest {
|
|||
assertEquals(primaryResponse.hashCode(), reversedResponse.hashCode());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
|
||||
public void testNoErrorNoMembersResponses(short version) {
|
||||
LeaveGroupResponseData data = new LeaveGroupResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setMembers(Collections.emptyList());
|
||||
|
||||
if (version < 3) {
|
||||
assertThrows(UnsupportedVersionException.class,
|
||||
() -> new LeaveGroupResponse(data, version));
|
||||
} else {
|
||||
LeaveGroupResponse response = new LeaveGroupResponse(data, version);
|
||||
assertEquals(Errors.NONE, response.topLevelError());
|
||||
assertEquals(Collections.emptyList(), response.memberResponses());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
|
||||
public void testNoErrorMultipleMembersResponses(short version) {
|
||||
LeaveGroupResponseData data = new LeaveGroupResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setMembers(memberResponses);
|
||||
|
||||
if (version < 3) {
|
||||
assertThrows(UnsupportedVersionException.class,
|
||||
() -> new LeaveGroupResponse(data, version));
|
||||
} else {
|
||||
LeaveGroupResponse response = new LeaveGroupResponse(data, version);
|
||||
assertEquals(Errors.NONE, response.topLevelError());
|
||||
assertEquals(memberResponses, response.memberResponses());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
|
||||
public void testErrorResponses(short version) {
|
||||
LeaveGroupResponseData dataNoMembers = new LeaveGroupResponseData()
|
||||
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
|
||||
.setMembers(Collections.emptyList());
|
||||
|
||||
LeaveGroupResponse responseNoMembers = new LeaveGroupResponse(dataNoMembers, version);
|
||||
assertEquals(Errors.GROUP_ID_NOT_FOUND, responseNoMembers.topLevelError());
|
||||
|
||||
LeaveGroupResponseData dataMembers = new LeaveGroupResponseData()
|
||||
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
|
||||
.setMembers(memberResponses);
|
||||
|
||||
LeaveGroupResponse responseMembers = new LeaveGroupResponse(dataMembers, version);
|
||||
assertEquals(Errors.GROUP_ID_NOT_FOUND, responseMembers.topLevelError());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue