mirror of https://github.com/apache/kafka.git
KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599)
The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error. This PR fixes the issue on the client side by not setting the group instance id if the member id is empty (no generation). Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
706adc39f6
commit
b7f20be809
|
@ -1272,8 +1272,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
||||||
}
|
}
|
||||||
|
|
||||||
final Generation generation;
|
final Generation generation;
|
||||||
|
final String groupInstanceId;
|
||||||
if (subscriptions.hasAutoAssignedPartitions()) {
|
if (subscriptions.hasAutoAssignedPartitions()) {
|
||||||
generation = generationIfStable();
|
generation = generationIfStable();
|
||||||
|
groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
|
||||||
// if the generation is null, we are not part of an active group (and we expect to be).
|
// if the generation is null, we are not part of an active group (and we expect to be).
|
||||||
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
|
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
|
||||||
if (generation == null) {
|
if (generation == null) {
|
||||||
|
@ -1293,6 +1295,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
generation = Generation.NO_GENERATION;
|
generation = Generation.NO_GENERATION;
|
||||||
|
groupInstanceId = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
|
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
|
||||||
|
@ -1300,7 +1303,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
||||||
.setGroupId(this.rebalanceConfig.groupId)
|
.setGroupId(this.rebalanceConfig.groupId)
|
||||||
.setGenerationId(generation.generationId)
|
.setGenerationId(generation.generationId)
|
||||||
.setMemberId(generation.memberId)
|
.setMemberId(generation.memberId)
|
||||||
.setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
|
.setGroupInstanceId(groupInstanceId)
|
||||||
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
|
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -2820,6 +2820,32 @@ public abstract class ConsumerCoordinatorTest {
|
||||||
assertEquals(newGen, coordinator.generation());
|
assertEquals(newGen, coordinator.generation());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
|
||||||
|
rebalanceConfig = buildRebalanceConfig(groupInstanceId);
|
||||||
|
ConsumerCoordinator coordinator = buildCoordinator(
|
||||||
|
rebalanceConfig,
|
||||||
|
new Metrics(),
|
||||||
|
assignors,
|
||||||
|
false,
|
||||||
|
subscriptions
|
||||||
|
);
|
||||||
|
|
||||||
|
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
|
||||||
|
coordinator.ensureCoordinatorReady(time.timer(5000));
|
||||||
|
|
||||||
|
client.prepareResponse(body -> {
|
||||||
|
OffsetCommitRequestData data = ((OffsetCommitRequest) body).data();
|
||||||
|
return data.groupInstanceId() == null && data.memberId().isEmpty();
|
||||||
|
}, offsetCommitResponse(Collections.emptyMap()));
|
||||||
|
|
||||||
|
RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
|
||||||
|
new OffsetAndMetadata(100L, "metadata")));
|
||||||
|
|
||||||
|
assertTrue(consumerClient.poll(future, time.timer(5000)));
|
||||||
|
assertFalse(future.failed());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitOffsetRebalanceInProgress() {
|
public void testCommitOffsetRebalanceInProgress() {
|
||||||
// we cannot retry if a rebalance occurs before the commit completed
|
// we cannot retry if a rebalance occurs before the commit completed
|
||||||
|
|
Loading…
Reference in New Issue