KAFKA-17116 New consumer may not send effective leave group if member ID received after close (#17549)

KIP-1082 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1082%3A+Require+Client-Generated+IDs+over+the+ConsumerGroupHeartbeat+RPC)

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-11-01 03:03:17 +08:00 committed by GitHub
parent ea7da09e53
commit 6f040cabc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 488 additions and 383 deletions

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@ -80,10 +79,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
protected final String groupId;
/**
* Member ID assigned by the server to the member, received in a heartbeat response when
* joining the group specified in {@link #groupId}
* Member ID generated by the consumer at startup, which is unique within the group and remains consistent
* for the entire lifetime of the process. This ID acts as an incarnation identifier for the consumer process
* and does not reset or change, even if the consumer leaves and rejoins the group.
* The Member ID remains the same until the process is completely stopped or terminated.
*/
protected String memberId = "";
protected final String memberId = Uuid.randomUuid().toString();
/**
* Current epoch of the member. It will be set to 0 by the member, and provided to the server
@ -164,19 +165,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
/**
* Registered listeners that will be notified whenever the memberID/epoch gets updated (valid
* values received from the broker, or values cleared due to member leaving the group, getting
* fenced or failing).
* Registered listeners that will be notified whenever the member epoch gets updated
* (valid values received from the broker, or values cleared due to member leaving
* the group, getting fenced or failing).
*/
private final List<MemberStateListener> stateUpdatesListeners;
/**
* Optional client telemetry reporter which sends client telemetry data to the broker. This
* will be empty if the client telemetry feature is not enabled. This is provided to update
* the group member id label when the member joins the group.
*/
protected final Optional<ClientTelemetryReporter> clientTelemetryReporter;
/**
* Future that will complete when a stale member completes releasing its assignment after
* leaving the group due to poll timer expired. Used to make sure that the member rejoins
@ -209,7 +203,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Logger log,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Time time,
RebalanceMetricsManager metricsManager) {
this.groupId = groupId;
@ -221,7 +214,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
this.currentAssignment = LocalAssignment.NONE;
this.log = log;
this.stateUpdatesListeners = new ArrayList<>();
this.clientTelemetryReporter = clientTelemetryReporter;
this.time = time;
this.metricsManager = metricsManager;
}
@ -245,7 +237,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
metricsManager.recordRebalanceStarted(time.milliseconds());
}
log.info("Member {} with epoch {} transitioned from {} to {}.", memberIdInfoForLog(), memberEpoch, state, nextState);
log.info("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
this.state = nextState;
}
@ -266,7 +258,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
}
/**
* @return Member ID assigned by the server to this member when it joins the consumer group.
* @return Member ID that is generated at startup and remains unchanged for the entire lifetime of the process.
*/
public String memberId() {
return memberId;
@ -299,7 +291,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
// operation once the request completes, regardless of the response.
if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
log.warn("Member {} with epoch {} received a failed response to the heartbeat to " +
"leave the group and completed the leave operation. ", memberIdInfoForLog(), memberEpoch);
"leave the group and completed the leave operation. ", memberId, memberEpoch);
}
}
@ -378,7 +370,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
if (state == MemberState.PREPARE_LEAVING) {
log.info("Member {} with epoch {} got fenced but it is already preparing to leave " +
"the group, so it will stop sending heartbeat and won't attempt to send the " +
"leave request or rejoin.", memberIdInfoForLog(), memberEpoch);
"leave request or rejoin.", memberId, memberEpoch);
// Briefly transition to LEAVING to ensure all required actions are applied even
// though there is no need to send a leave group heartbeat (ex. clear epoch and
// notify epoch listeners). Then transition to UNSUBSCRIBED, ensuring that the member
@ -392,20 +384,20 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
if (state == MemberState.LEAVING) {
log.debug("Member {} with epoch {} got fenced before sending leave group heartbeat. " +
"It will not send the leave request and won't attempt to rejoin.", memberIdInfoForLog(), memberEpoch);
"It will not send the leave request and won't attempt to rejoin.", memberId, memberEpoch);
transitionTo(MemberState.UNSUBSCRIBED);
maybeCompleteLeaveInProgress();
return;
}
if (state == MemberState.UNSUBSCRIBED) {
log.debug("Member {} with epoch {} got fenced but it already left the group, so it " +
"won't attempt to rejoin.", memberIdInfoForLog(), memberEpoch);
"won't attempt to rejoin.", memberId, memberEpoch);
return;
}
transitionTo(MemberState.FENCED);
resetEpoch();
log.debug("Member {} with epoch {} transitioned to {} state. It will release its " +
"assignment and rejoin the group.", memberIdInfoForLog(), memberEpoch, MemberState.FENCED);
"assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED);
// Release assignment
CompletableFuture<Void> callbackResult = signalPartitionsLost(subscriptions.assignedPartitions());
@ -432,19 +424,19 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
public void transitionToFatal() {
MemberState previousState = state;
transitionTo(MemberState.FATAL);
log.error("Member {} with epoch {} transitioned to fatal state", memberIdInfoForLog(), memberEpoch);
notifyEpochChange(Optional.empty(), Optional.empty());
log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch);
notifyEpochChange(Optional.empty());
if (previousState == MemberState.UNSUBSCRIBED) {
log.debug("Member {} with epoch {} got fatal error from the broker but it already " +
"left the group, so onPartitionsLost callback won't be triggered.", memberIdInfoForLog(), memberEpoch);
"left the group, so onPartitionsLost callback won't be triggered.", memberId, memberEpoch);
return;
}
if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) {
log.info("Member {} with epoch {} was leaving the group with state {} when it got a " +
"fatal error from the broker. It will discard the ongoing leave and remain in " +
"fatal state.", memberIdInfoForLog(), memberEpoch, previousState);
"fatal state.", memberId, memberEpoch, previousState);
maybeCompleteLeaveInProgress();
return;
}
@ -460,11 +452,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
});
}
// Visible for testing
String memberIdInfoForLog() {
return (memberId == null || memberId.isEmpty()) ? "<no ID>" : memberId;
}
/**
* Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated.
* The next {@link #onConsumerPoll()} will join the group with the updated subscription, if the member is not part of it yet.
@ -554,7 +541,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) {
// Member already leaving. No-op and return existing leave group future that will
// complete when the ongoing leave operation completes.
log.debug("Leave group operation already in progress for member {}", memberIdInfoForLog());
log.debug("Leave group operation already in progress for member {}", memberId);
return leaveGroupInProgress.get();
}
@ -566,10 +553,10 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberIdInfoForLog(), error);
"to clear its assignment and send a leave group heartbeat", memberId, error);
} else {
log.info("Member {} completed callback to release assignment. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberIdInfoForLog());
"to clear its assignment and send a leave group heartbeat", memberId);
}
// Clear the subscription, no matter if the callback execution failed or succeeded.
@ -599,12 +586,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
if (state == MemberState.FATAL) {
log.warn("Member {} with epoch {} won't send leave group request because it is in " +
"FATAL state", memberIdInfoForLog(), memberEpoch);
"FATAL state", memberId, memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED) {
log.warn("Member {} won't send leave group request because it is already out of the group.",
memberIdInfoForLog());
memberId);
return;
}
@ -622,10 +609,10 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
/**
* Call all listeners that are registered to get notified when the member epoch is updated.
* This also includes the latest member ID in the notification. If the member fails or leaves
* the group, this will be invoked with empty epoch and member ID.
* This also includes the member ID in the notification. If the member fails or leaves
* the group, this will be invoked with empty epoch.
*/
void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) {
void notifyEpochChange(Optional<Integer> epoch) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
}
@ -651,17 +638,17 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
} else {
log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " +
"to ack a previous reconciliation. New assignments are ready to " +
"be reconciled.", memberIdInfoForLog(), memberEpoch, MemberState.RECONCILING);
"be reconciled.", memberId, memberEpoch, MemberState.RECONCILING);
transitionTo(MemberState.RECONCILING);
}
} else if (state == MemberState.LEAVING) {
if (isPollTimerExpired) {
log.debug("Member {} with epoch {} generated the heartbeat to leave due to expired poll timer. It will " +
"remain stale (no heartbeat) until it rejoins the group on the next consumer " +
"poll.", memberIdInfoForLog(), memberEpoch);
"poll.", memberId, memberEpoch);
transitionToStale();
} else {
log.debug("Member {} with epoch {} generated the heartbeat to leave the group.", memberIdInfoForLog(), memberEpoch);
log.debug("Member {} with epoch {} generated the heartbeat to leave the group.", memberId, memberEpoch);
transitionTo(MemberState.UNSUBSCRIBED);
}
}
@ -676,7 +663,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
if (state == MemberState.LEAVING) {
log.warn("Heartbeat to leave group cannot be sent (most probably due to coordinator " +
"not known/available). Member {} with epoch {} will transition to {}.",
memberIdInfoForLog(), memberEpoch, MemberState.UNSUBSCRIBED);
memberId, memberEpoch, MemberState.UNSUBSCRIBED);
transitionTo(MemberState.UNSUBSCRIBED);
maybeCompleteLeaveInProgress();
}
@ -719,7 +706,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
isPollTimerExpired = false;
if (state == MemberState.STALE) {
log.debug("Expired poll timer has been reset so stale member {} will rejoin the group " +
"when it completes releasing its previous assignment.", memberIdInfoForLog());
"when it completes releasing its previous assignment.", memberId);
staleMemberAssignmentRelease.whenComplete((__, error) -> transitionToJoining());
}
}
@ -743,7 +730,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
clearAssignment();
log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain " +
"in {} state until the poll timer is reset, and it will then rejoin the group",
memberIdInfoForLog(), MemberState.STALE);
memberId, MemberState.STALE);
});
}
@ -811,7 +798,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
resolvedAssignment.localEpoch,
memberIdInfoForLog(),
memberId,
assignedTopicPartitions,
ownedPartitions,
addedPartitions,
@ -1094,7 +1081,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
if (state == MemberState.FATAL) {
String errorMsg = String.format("Member %s with epoch %s received a fatal error " +
"while waiting for a revocation commit to complete. Will abort revocation " +
"without triggering user callback.", memberIdInfoForLog(), memberEpoch);
"without triggering user callback.", memberId, memberEpoch);
log.debug(errorMsg);
revocationResult.completeExceptionally(new KafkaException(errorMsg));
return revocationResult;
@ -1234,13 +1221,13 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
protected void updateMemberEpoch(int newEpoch) {
boolean newEpochReceived = this.memberEpoch != newEpoch;
this.memberEpoch = newEpoch;
// Simply notify based on epoch change only, given that the member will never receive a
// new member ID without an epoch (member ID is only assigned when it joins the group).
// Simply notify based on epoch changes only, since the member will generate a member ID
// at startup, and it will remain unchanged for its entire lifetime.
if (newEpochReceived) {
if (memberEpoch > 0) {
notifyEpochChange(Optional.of(memberEpoch), Optional.ofNullable(memberId));
notifyEpochChange(Optional.of(memberEpoch));
} else {
notifyEpochChange(Optional.empty(), Optional.empty());
notifyEpochChange(Optional.empty());
}
}
}

View File

@ -631,14 +631,15 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
);
}
private void updateGroupMetadata(final Optional<Integer> memberEpoch, final Optional<String> memberId) {
groupMetadata.updateAndGet(
oldGroupMetadataOptional -> oldGroupMetadataOptional.map(
oldGroupMetadata -> new ConsumerGroupMetadata(
oldGroupMetadata.groupId(),
memberEpoch.orElse(oldGroupMetadata.generationId()),
memberId.orElse(oldGroupMetadata.memberId()),
oldGroupMetadata.groupInstanceId()
private void updateGroupMetadata(final Optional<Integer> memberEpoch, final String memberId) {
memberEpoch.ifPresent(epoch -> groupMetadata.updateAndGet(
oldGroupMetadataOptional -> oldGroupMetadataOptional.map(
oldGroupMetadata -> new ConsumerGroupMetadata(
oldGroupMetadata.groupId(),
memberEpoch.orElse(oldGroupMetadata.generationId()),
memberId,
oldGroupMetadata.groupInstanceId()
)
)
)
);

View File

@ -95,10 +95,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
private Optional<Integer> lastEpochSentOnCommit;
/**
* Latest member ID and epoch received via the {@link #onMemberEpochUpdated(Optional, Optional)},
* to be included in the OffsetFetch and OffsetCommit requests if present. This will have
* the latest values received from the broker, or empty of the member is not part of the
* group anymore.
* The member ID and latest member epoch received via the {@link MemberStateListener#onMemberEpochUpdated(Optional, String)},
* to be included in the OffsetFetch and OffsetCommit requests. This will have
* the latest memberEpoch received from the broker.
*/
private final MemberInfo memberInfo;
@ -297,7 +296,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* expires. Note that:
* <ul>
* <li>Considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, and will retry it
* including the latest member ID and epoch received from the broker.</li>
* including the member ID and latest member epoch received from the broker.</li>
* <li>Considers {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} as a fatal error, and will not
* retry it although the error extends RetriableException. The reason is that if a topic
* or partition is deleted, revocation would not finish in time since the auto commit would keep retrying.</li>
@ -340,7 +339,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
} else {
// Make sure the auto-commit is retried with the latest offsets
log.debug("Member {} will retry auto-commit of latest offsets after receiving retriable error {}",
memberInfo.memberId.orElse("undefined"),
memberInfo.memberId,
error.getMessage());
requestAttempt.offsets = subscriptions.allConsumed();
requestAttempt.resetFuture();
@ -566,16 +565,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
}
/**
* Update latest member ID and epoch used by the member.
* Update latest member epoch used by the member.
*
* @param memberEpoch New member epoch received. To be included in the new request.
* @param memberId Current member ID. To be included in the new request.
*/
@Override
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) {
if (!memberEpoch.isPresent() && memberInfo.memberEpoch.isPresent()) {
log.info("Member {} won't include member id and epoch in following offset " +
"commit/fetch requests because it has left the group.", memberInfo.memberId.orElse("unknown"));
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
if (memberEpoch.isEmpty() && memberInfo.memberEpoch.isPresent()) {
log.info("Member {} won't include epoch in following offset " +
"commit/fetch requests because it has left the group.", memberInfo.memberId);
}
memberInfo.memberId = memberId;
memberInfo.memberEpoch = memberEpoch;
@ -684,9 +683,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
.setGroupId(this.groupId)
.setGroupInstanceId(groupInstanceId.orElse(null))
.setTopics(new ArrayList<>(requestTopicDataMap.values()));
if (memberInfo.memberId.isPresent()) {
data = data.setMemberId(memberInfo.memberId.get());
}
data = data.setMemberId(memberInfo.memberId);
if (memberInfo.memberEpoch.isPresent()) {
data = data.setGenerationIdOrMemberEpoch(memberInfo.memberEpoch.get());
lastEpochSentOnCommit = memberInfo.memberEpoch;
@ -759,7 +756,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
return;
} else if (error == Errors.STALE_MEMBER_EPOCH) {
log.error("OffsetCommit failed for member {} with stale member epoch error. Last epoch sent: {}",
memberInfo.memberId.orElse("undefined"),
memberInfo.memberId,
lastEpochSentOnCommit.isPresent() ? lastEpochSentOnCommit.get() : "undefined");
future.completeExceptionally(error.exception());
return;
@ -943,24 +940,21 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetFetchRequest.Builder builder;
if (memberInfo.memberId.isPresent() && memberInfo.memberEpoch.isPresent()) {
builder = new OffsetFetchRequest.Builder(
groupId,
memberInfo.memberId.get(),
memberInfo.memberEpoch.get(),
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
} else {
OffsetFetchRequest.Builder builder = memberInfo.memberEpoch.
map(epoch -> new OffsetFetchRequest.Builder(
groupId,
memberInfo.memberId,
epoch,
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported))
// Building request without passing member ID/epoch to leave the logic to choose
// default values when not present on the request builder.
builder = new OffsetFetchRequest.Builder(
groupId,
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
}
.orElseGet(() -> new OffsetFetchRequest.Builder(
groupId,
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported));
return buildRequestWithResponseHandling(builder);
}
@ -1293,17 +1287,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
}
static class MemberInfo {
Optional<String> memberId;
Optional<Integer> memberEpoch;
MemberInfo() {
this.memberId = Optional.empty();
this.memberEpoch = Optional.empty();
}
String memberId = "";
Optional<Integer> memberEpoch = Optional.empty();
@Override
public String toString() {
return "memberId=" + memberId.orElse("undefined") +
return "memberId=" + memberId +
", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined");
}
}

View File

@ -207,7 +207,7 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
// GroupId - always sent
data.setGroupId(membershipManager.groupId());
// MemberId - always sent, empty until it has been received from the coordinator
// MemberId - always sent, it will be generated at Consumer startup.
data.setMemberId(membershipManager.memberId());
// MemberEpoch - always sent

View File

@ -33,12 +33,9 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -146,7 +143,6 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
CommitRequestManager commitRequestManager,
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler,
Time time,
Metrics metrics) {
@ -158,7 +154,6 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
commitRequestManager,
metadata,
logContext,
clientTelemetryReporter,
backgroundEventHandler,
time,
new ConsumerRebalanceMetricsManager(metrics));
@ -173,7 +168,6 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
CommitRequestManager commitRequestManager,
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler,
Time time,
RebalanceMetricsManager metricsManager) {
@ -181,7 +175,6 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
subscriptions,
metadata,
logContext.logger(ConsumerMembershipManager.class),
clientTelemetryReporter,
time,
metricsManager);
this.groupInstanceId = groupInstanceId;
@ -229,16 +222,6 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
return;
}
// Update the group member id label in the client telemetry reporter if the member id has
// changed. Initially the member id is empty, and it is updated when the member joins the
// group. This is done here to avoid updating the label on every heartbeat response. Also
// check if the member id is null, as the schema defines it as nullable.
if (responseData.memberId() != null && !responseData.memberId().equals(memberId)) {
clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels(
Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, responseData.memberId())));
}
this.memberId = responseData.memberId();
updateMemberEpoch(responseData.memberEpoch());
ConsumerGroupHeartbeatResponseData.Assignment assignment = responseData.assignment();

View File

@ -20,18 +20,18 @@ package org.apache.kafka.clients.consumer.internals;
import java.util.Optional;
/**
* Listener for getting notified of member ID and epoch changes.
* Listener for getting notified of member epoch changes.
*/
public interface MemberStateListener {
/**
* Called whenever member ID or epoch change with new values received from the broker or
* Called whenever epoch changes with new values received from the broker or
* cleared if the member is not part of the group anymore (when it gets fenced, leaves the
* group or fails).
*
* @param memberEpoch New member epoch received from the broker. Empty if the member is
* not part of the group anymore.
* @param memberId Current member ID. Empty if the member is not part of the group.
* @param memberId Current member ID. It won't change until the process is terminated.
*/
void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId);
void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId);
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -32,6 +33,7 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
@ -213,10 +215,18 @@ public class RequestManagers implements Closeable {
commitRequestManager,
metadata,
logContext,
clientTelemetryReporter,
backgroundEventHandler,
time,
metrics);
// Update the group member ID label in the client telemetry reporter.
// According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process.
// Therefore, we can update the group member ID during initialization.
if (clientTelemetryReporter.isPresent()) {
clientTelemetryReporter.get()
.updateMetricsLabels(Map.of(ClientTelemetryProvider.GROUP_MEMBER_ID, membershipManager.memberId()));
}
membershipManager.registerStateListener(commitRequestManager);
membershipManager.registerStateListener(applicationThreadMemberStateListener);
heartbeatRequestManager = new ConsumerHeartbeatRequestManager(
@ -292,9 +302,15 @@ public class RequestManagers implements Closeable {
null,
subscriptions,
metadata,
clientTelemetryReporter,
time,
time,
metrics);
// Update the group member ID label in the client telemetry reporter.
// According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process.
// Therefore, we can update the group member ID during initialization.
clientTelemetryReporter.ifPresent(telemetryReporter -> telemetryReporter
.updateMetricsLabels(Map.of(ClientTelemetryProvider.GROUP_MEMBER_ID, shareMembershipManager.memberId())));
ShareHeartbeatRequestManager shareHeartbeatRequestManager = new ShareHeartbeatRequestManager(
logContext,
time,

View File

@ -847,8 +847,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
@Override
public void onMemberEpochUpdated(Optional<Integer> memberEpochOpt, Optional<String> memberIdOpt) {
memberIdOpt.ifPresent(s -> memberId = Uuid.fromString(s));
public void onMemberEpochUpdated(Optional<Integer> memberEpochOpt, String memberId) {
this.memberId = Uuid.fromString(memberId);
}
/**

View File

@ -168,7 +168,7 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
// GroupId - always sent
data.setGroupId(shareMembershipManager.groupId());
// MemberId - always sent, empty until it has been received from the coordinator
// MemberId - always sent, it will be generated at Consumer startup.
data.setMemberId(shareMembershipManager.memberId());
// MemberEpoch - always sent

View File

@ -24,15 +24,11 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
@ -86,7 +82,6 @@ public class ShareMembershipManager extends AbstractMembershipManager<ShareGroup
String rackId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Time time,
Metrics metrics) {
this(logContext,
@ -94,7 +89,6 @@ public class ShareMembershipManager extends AbstractMembershipManager<ShareGroup
rackId,
subscriptions,
metadata,
clientTelemetryReporter,
time,
new ShareRebalanceMetricsManager(metrics));
}
@ -105,14 +99,12 @@ public class ShareMembershipManager extends AbstractMembershipManager<ShareGroup
String rackId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Time time,
ShareRebalanceMetricsManager metricsManager) {
super(groupId,
subscriptions,
metadata,
logContext.logger(ShareMembershipManager.class),
clientTelemetryReporter,
time,
metricsManager);
this.rackId = rackId;
@ -155,16 +147,6 @@ public class ShareMembershipManager extends AbstractMembershipManager<ShareGroup
return;
}
// Update the group member id label in the client telemetry reporter if the member id has
// changed. Initially the member id is empty, and it is updated when the member joins the
// group. This is done here to avoid updating the label on every heartbeat response. Also
// check if the member id is null, as the schema defines it as nullable.
if (responseData.memberId() != null && !responseData.memberId().equals(memberId)) {
clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels(
Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, responseData.memberId())));
}
this.memberId = responseData.memberId();
updateMemberEpoch(responseData.memberEpoch());
ShareGroupHeartbeatResponseData.Assignment assignment = responseData.assignment();

View File

@ -37,6 +37,14 @@ public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
/**
* The version from which consumers are required to generate their own member id.
*
* <p>Starting from this version, member id must be generated by the consumer instance
* instead of being provided by the server.</p>
*/
public static final int CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data;

View File

@ -18,7 +18,7 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
// Version 1 adds SubscribedTopicRegex (KIP-848).
// Version 1 adds SubscribedTopicRegex (KIP-848), and requires the consumer to generate their own Member ID (KIP-1082)
"validVersions": "0-1",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
@ -26,7 +26,7 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." },
"about": "The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

View File

@ -39,7 +39,7 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
"about": "The member id is generated by the consumer starting from version 1, while in version 0, it can be provided by users or generated by the group coordinator." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",

View File

@ -48,7 +48,7 @@
{ "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+",
"about": "The group members.", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID assigned by the group coordinator." },
"about": "The member id" },
{ "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true,
"nullableVersions": "4+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },

View File

@ -54,7 +54,7 @@
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID."},
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": "null", "ignorable": true,
"about": "The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848)." },
"about": "The member id" },
{ "name": "MemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",

View File

@ -28,7 +28,7 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
"about": "The member id" },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

View File

@ -35,7 +35,7 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
"about": "The member ID is generated by the consumer and provided by the consumer for all requests." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",

View File

@ -1429,7 +1429,7 @@ public class AsyncKafkaConsumerTest {
final String expectedMemberId = "memberId";
groupMetadataUpdateListener.onMemberEpochUpdated(
Optional.of(expectedMemberEpoch),
Optional.of(expectedMemberId)
expectedMemberId
);
final ConsumerGroupMetadata newGroupMetadata = consumer.groupMetadata();
assertEquals(oldGroupMetadata.groupId(), newGroupMetadata.groupId());
@ -1449,7 +1449,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singletonList("topic"));
final int memberEpoch = 42;
final String memberId = "memberId";
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), Optional.of(memberId));
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), memberId);
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId());
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId());

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@ -143,7 +144,7 @@ public class CommitRequestManagerTest {
OptionalDouble.of(0),
metrics);
commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty());
commitRequestManager.onMemberEpochUpdated(Optional.of(1), Uuid.randomUuid().toString());
Set<TopicPartition> requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1));
CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0);
@ -1028,7 +1029,7 @@ public class CommitRequestManagerTest {
// Mock member has new a valid epoch.
int newEpoch = 8;
String memberId = "member1";
commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId));
commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), memberId);
// Receive error when member already has a newer member epoch. Request should be retried.
completeOffsetFetchRequestWithError(commitRequestManager, partitions, Errors.STALE_MEMBER_EPOCH);
@ -1066,7 +1067,7 @@ public class CommitRequestManagerTest {
commitRequestManager.fetchOffsets(partitions, deadlineMs);
// Mock member not having a valid epoch anymore (left/failed/fenced).
commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty());
commitRequestManager.onMemberEpochUpdated(Optional.empty(), Uuid.randomUuid().toString());
// Receive error when member is not in the group anymore. Request should fail.
completeOffsetFetchRequestWithError(commitRequestManager, partitions, Errors.STALE_MEMBER_EPOCH);
@ -1103,7 +1104,7 @@ public class CommitRequestManagerTest {
String memberId = "member1";
if (error == Errors.STALE_MEMBER_EPOCH) {
// Mock member has new a valid epoch
commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId));
commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), memberId);
}
completeOffsetCommitRequestWithError(commitRequestManager, error);
@ -1155,21 +1156,21 @@ public class CommitRequestManagerTest {
int initialEpoch = 1;
String memberId = "member1";
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch), Optional.of(memberId));
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch), memberId);
// Send request with epoch 1
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
assertEquals(initialEpoch, commitRequestManager.lastEpochSentOnCommit().orElse(null));
// Receive new epoch. Last epoch sent should change only when sending out the next request
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch + 1), Optional.of(memberId));
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch + 1), memberId);
assertEquals(initialEpoch, commitRequestManager.lastEpochSentOnCommit().get());
time.sleep(retryBackoffMs);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
assertEquals(initialEpoch + 1, commitRequestManager.lastEpochSentOnCommit().orElse(null));
// Receive empty epochs
commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty());
commitRequestManager.onMemberEpochUpdated(Optional.empty(), memberId);
time.sleep(retryBackoffMs * 2);
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
assertFalse(commitRequestManager.lastEpochSentOnCommit().isPresent());

View File

@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -268,8 +269,10 @@ public class ConsumerHeartbeatRequestManagerTest {
ConsumerGroupHeartbeatRequest heartbeatRequest =
(ConsumerGroupHeartbeatRequest) request.requestBuilder().build(version);
// Should include epoch 0 to join and no member ID.
assertTrue(heartbeatRequest.data().memberId().isEmpty());
// Should include epoch 0 and member id to join
String memberId = heartbeatRequest.data().memberId();
assertNotNull(memberId);
assertFalse(memberId.isEmpty());
assertEquals(0, heartbeatRequest.data().memberEpoch());
// Should include subscription and group basic info to start getting assignments, as well as rebalanceTimeoutMs
@ -590,7 +593,7 @@ public class ConsumerHeartbeatRequestManagerTest {
// The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values
ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals("", data.memberId());
assertEquals(DEFAULT_MEMBER_ID, data.memberId());
assertEquals(0, data.memberEpoch());
assertNull(data.instanceId());
assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
@ -938,7 +941,7 @@ public class ConsumerHeartbeatRequestManagerTest {
private void mockJoiningMemberData(String instanceId) {
when(membershipManager.state()).thenReturn(MemberState.JOINING);
when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
when(membershipManager.memberId()).thenReturn("");
when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(0);
when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);

View File

@ -96,7 +96,6 @@ import static org.mockito.Mockito.when;
public class ConsumerMembershipManagerTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final int REBALANCE_TIMEOUT = 100;
private static final int MEMBER_EPOCH = 1;
private static final LogContext LOG_CONTEXT = new LogContext();
@ -135,10 +134,12 @@ public class ConsumerMembershipManagerTest {
}
private ConsumerMembershipManager createMembershipManager(String groupInstanceId) {
return spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(),
backgroundEventHandler, time, rebalanceMetricsManager));
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
private ConsumerMembershipManager createMembershipManagerJoiningGroup(String groupInstanceId,
@ -146,8 +147,8 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
metadata, LOG_CONTEXT, Optional.empty(), backgroundEventHandler, time,
rebalanceMetricsManager));
metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager));
assertMemberIdIsGenerated(manager.memberId());
manager.transitionToJoining();
return manager;
}
@ -182,12 +183,12 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.JOINING, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
createConsumerGroupHeartbeatResponse(new Assignment());
createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(responseWithoutAssignment);
assertEquals(MemberState.RECONCILING, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithAssignment =
createConsumerGroupHeartbeatResponse(createAssignment(true));
createConsumerGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(responseWithAssignment);
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
@ -195,20 +196,20 @@ public class ConsumerMembershipManagerTest {
@Test
public void testMemberIdAndEpochResetOnFencedMembers() {
ConsumerMembershipManager membershipManager = createMemberInStableState();
assertEquals(MEMBER_ID, membershipManager.memberId());
String originalMemberId = membershipManager.memberId();
assertNotNull(originalMemberId);
assertFalse(originalMemberId.isEmpty());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
mockMemberHasAutoAssignedPartition();
membershipManager.transitionToFenced();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(0, membershipManager.memberEpoch());
}
@Test
public void testTransitionToFatal() {
ConsumerMembershipManager membershipManager = createMemberInStableState(null);
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
@ -224,8 +225,8 @@ public class ConsumerMembershipManagerTest {
public void testTransitionToFailedWhenTryingToJoin() {
ConsumerMembershipManager membershipManager = new ConsumerMembershipManager(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(),
backgroundEventHandler, time, rebalanceMetricsManager);
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@ -248,13 +249,13 @@ public class ConsumerMembershipManagerTest {
MemberStateListener listener = mock(MemberStateListener.class);
membershipManager.registerStateListener(listener);
mockStableMember(membershipManager);
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId);
clearInvocations(listener);
// Transition to FAILED before getting member ID/epoch
membershipManager.transitionToFatal();
assertEquals(MemberState.FATAL, membershipManager.state());
verify(listener).onMemberEpochUpdated(Optional.empty(), Optional.empty());
verify(listener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId);
}
@Test
@ -264,46 +265,47 @@ public class ConsumerMembershipManagerTest {
MemberStateListener listener = mock(MemberStateListener.class);
membershipManager.registerStateListener(listener);
mockStableMember(membershipManager);
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId);
clearInvocations(listener);
mockLeaveGroup();
membershipManager.leaveGroup();
verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
verify(listener).onMemberEpochUpdated(Optional.empty(), Optional.empty());
verify(listener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId);
}
@Test
public void testListenersGetNotifiedOfMemberEpochUpdatesOnlyIfItChanges() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
String memberId = membershipManager.memberId();
MemberStateListener listener = mock(MemberStateListener.class);
membershipManager.registerStateListener(listener);
int epoch = 5;
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(epoch)));
verify(listener).onMemberEpochUpdated(Optional.of(epoch), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(epoch), memberId);
clearInvocations(listener);
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(epoch)));
verify(listener, never()).onMemberEpochUpdated(any(), any());
}
private void mockStableMember(ConsumerMembershipManager membershipManager) {
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment());
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(),
membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.onHeartbeatSuccess(heartbeatResponse);
membershipManager.poll(time.milliseconds());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
}
@ -344,7 +346,7 @@ public class ConsumerMembershipManagerTest {
completeCallback(callbackEvent, membershipManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
verify(membershipManager).notifyEpochChange(Optional.empty(), Optional.empty());
verify(membershipManager).notifyEpochChange(Optional.empty());
assertTrue(membershipManager.shouldSkipHeartbeat());
}
@ -414,7 +416,7 @@ public class ConsumerMembershipManagerTest {
assertFalse(sendLeave.isDone(), "Send leave operation should not complete until a response is received");
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(new Assignment()));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, sendLeave);
}
@ -597,7 +599,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(toTopicIdPartitionMap(assignment1), membershipManager.currentAssignment().partitions);
// Receive assignment, wait on commit
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
CompletableFuture<Void> commitResult = new CompletableFuture<>();
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
@ -617,7 +619,7 @@ public class ConsumerMembershipManagerTest {
assertTrue(subscriptionState.assignedPartitions().isEmpty());
// We have to reconcile & ack the assignment again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -653,7 +655,7 @@ public class ConsumerMembershipManagerTest {
// Receive assignment - full reconciliation triggered
// stay in RECONCILING state, since an unresolved topic is assigned
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
verifyReconciliationTriggeredAndCompleted(membershipManager,
@ -664,12 +666,12 @@ public class ConsumerMembershipManagerTest {
clearInvocations(membershipManager);
// Receive extended assignment - assignment received but no reconciliation triggered
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
// Receive original assignment again - full reconciliation not triggered but assignment is acked again
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -899,7 +901,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
assertFalse(leaveResult.isDone());
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true)));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, leaveResult);
}
@ -926,7 +928,6 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state(), "Member should " +
"remain UNSUBSCRIBED after receiving the response to the HB to leave");
assertEquals(-1, membershipManager.memberEpoch());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(sendLeave.isDone(), "Leave group result should complete when the response to" +
" the heartbeat request to leave is received.");
@ -1166,7 +1167,7 @@ public class ConsumerMembershipManagerTest {
// Updating state with a heartbeat response containing errors cannot be performed and
// should fail.
ConsumerGroupHeartbeatResponse unknownMemberResponse =
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID, membershipManager.memberId());
assertThrows(IllegalArgumentException.class,
() -> membershipManager.onHeartbeatSuccess(unknownMemberResponse));
}
@ -1340,7 +1341,7 @@ public class ConsumerMembershipManagerTest {
// Target assignment received again with the same unresolved topic. Client should keep it
// as unresolved.
clearInvocations(subscriptionState);
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment));
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(Collections.singleton(topic2), membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState, never()).assignFromSubscribed(anyCollection());
@ -2247,7 +2248,7 @@ public class ConsumerMembershipManagerTest {
@Test
public void testRebalanceMetricsOnSuccessfulRebalance() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment());
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
@ -2348,7 +2349,7 @@ public class ConsumerMembershipManagerTest {
@Test
public void testRebalanceMetricsOnFailedRebalance() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment());
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
Uuid topicId = Uuid.randomUuid();
@ -2368,17 +2369,6 @@ public class ConsumerMembershipManagerTest {
assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo));
}
@Test
public void testMemberIdInfoForLogs() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(null, null);
assertTrue(membershipManager.memberId().isEmpty());
assertFalse(membershipManager.memberIdInfoForLog().isEmpty());
membershipManager = createMemberInStableState(null);
assertFalse(membershipManager.memberId().isEmpty());
assertEquals(membershipManager.memberId(), membershipManager.memberIdInfoForLog());
}
private Object getMetricValue(Metrics metrics, MetricName name) {
return metrics.metrics().get(name).metricValue();
}
@ -2611,7 +2601,7 @@ public class ConsumerMembershipManagerTest {
private ConsumerMembershipManager mockJoinAndReceiveAssignment(boolean triggerReconciliation,
ConsumerGroupHeartbeatResponseData.Assignment assignment) {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(assignment);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(assignment, membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
@ -2634,7 +2624,7 @@ public class ConsumerMembershipManagerTest {
private ConsumerMembershipManager createMemberInStableState(String groupInstanceId) {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment());
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
@ -2654,7 +2644,7 @@ public class ConsumerMembershipManagerTest {
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(tp.getKey())
.setPartitions(new ArrayList<>(tp.getValue()))).collect(Collectors.toList()));
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -2664,7 +2654,7 @@ public class ConsumerMembershipManagerTest {
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(partitions)));
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -2683,7 +2673,7 @@ public class ConsumerMembershipManagerTest {
.setTopicId(topicId)
.setPartitions(partitions)));
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponseWithBumpedEpoch(targetAssignment);
createConsumerGroupHeartbeatResponseWithBumpedEpoch(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
verifyReconciliationNotTriggered(membershipManager);
@ -2714,7 +2704,7 @@ public class ConsumerMembershipManagerTest {
// New empty assignment received, revoking owned partition.
ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.emptyList());
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -2726,8 +2716,6 @@ public class ConsumerMembershipManagerTest {
mockMemberHasAutoAssignedPartition();
membershipManager.transitionToFenced();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(0, membershipManager.memberEpoch());
assertEquals(MemberState.JOINING, membershipManager.state());
}
@ -2749,7 +2737,6 @@ public class ConsumerMembershipManagerTest {
"heartbeat request to leave is sent out.");
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager, leaveResult);
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(-1, membershipManager.memberEpoch());
assertTrue(membershipManager.currentAssignment().isNone());
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
@ -2826,10 +2813,10 @@ public class ConsumerMembershipManagerTest {
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(
ConsumerGroupHeartbeatResponseData.Assignment assignment) {
ConsumerGroupHeartbeatResponseData.Assignment assignment, String memberId) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(MEMBER_EPOCH)
.setAssignment(assignment));
}
@ -2840,18 +2827,18 @@ public class ConsumerMembershipManagerTest {
* receives a heartbeat response to the join request, and the response includes an assignment.
*/
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithBumpedEpoch(
ConsumerGroupHeartbeatResponseData.Assignment assignment) {
ConsumerGroupHeartbeatResponseData.Assignment assignment, String memberId) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(MEMBER_EPOCH + 1)
.setAssignment(assignment));
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error) {
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error, String memberId) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(5));
}
@ -2886,6 +2873,11 @@ public class ConsumerMembershipManagerTest {
return membershipManager;
}
private void assertMemberIdIsGenerated(String memberId) {
assertNotNull(memberId, "Member Id should be generated at startup");
assertFalse(memberId.isEmpty(), "Member Id should be generated at startup");
}
/**
* @return States where the member is not part of the group.
*/

View File

@ -1450,7 +1450,7 @@ public class ShareConsumeRequestManagerTest {
super(time, logContext, groupId, metadata, subscriptions, fetchConfig, shareFetchBuffer,
backgroundEventHandler, metricsManager, retryBackoffMs, 1000);
this.shareFetchCollector = fetchCollector;
onMemberEpochUpdated(Optional.empty(), Optional.of(Uuid.randomUuid().toString()));
onMemberEpochUpdated(Optional.empty(), Uuid.randomUuid().toString());
}
private ShareFetch<K, V> collectFetch() {

View File

@ -66,6 +66,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -85,7 +86,6 @@ import static org.mockito.Mockito.when;
public class ShareMembershipManagerTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final String RACK_ID = null;
private static final int MEMBER_EPOCH = 1;
@ -111,15 +111,16 @@ public class ShareMembershipManagerTest {
}
private ShareMembershipManager createMembershipManager() {
return spy(new ShareMembershipManager(
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata,
Optional.empty(), time, rebalanceMetricsManager));
ShareMembershipManager manager = spy(new ShareMembershipManager(
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata, time, rebalanceMetricsManager));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
private ShareMembershipManager createMembershipManagerJoiningGroup() {
ShareMembershipManager manager = spy(new ShareMembershipManager(
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata,
Optional.empty(), time, rebalanceMetricsManager));
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata, time, rebalanceMetricsManager));
assertMemberIdIsGenerated(manager.memberId());
manager.transitionToJoining();
return manager;
}
@ -128,8 +129,7 @@ public class ShareMembershipManagerTest {
public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
// First join should register to get metadata updates
ShareMembershipManager manager = new ShareMembershipManager(
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata,
Optional.empty(), time, rebalanceMetricsManager);
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata, time, rebalanceMetricsManager);
manager.transitionToJoining();
clearInvocations(metadata);
@ -158,12 +158,13 @@ public class ShareMembershipManagerTest {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
assertEquals(MemberState.JOINING, membershipManager.state());
ShareGroupHeartbeatResponse responseWithoutAssignment = createShareGroupHeartbeatResponse(new Assignment());
ShareGroupHeartbeatResponse responseWithoutAssignment = createShareGroupHeartbeatResponse(new Assignment(),
membershipManager.memberId());
membershipManager.onHeartbeatSuccess(responseWithoutAssignment);
assertEquals(MemberState.RECONCILING, membershipManager.state());
ShareGroupHeartbeatResponse responseWithAssignment =
createShareGroupHeartbeatResponse(createAssignment(true));
createShareGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(responseWithAssignment);
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
@ -171,20 +172,17 @@ public class ShareMembershipManagerTest {
@Test
public void testMemberIdAndEpochResetOnFencedMembers() {
ShareMembershipManager membershipManager = createMemberInStableState();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
mockMemberHasAutoAssignedPartition();
membershipManager.transitionToFenced();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(0, membershipManager.memberEpoch());
}
@Test
public void testTransitionToFatal() {
ShareMembershipManager membershipManager = createMemberInStableState();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
@ -199,8 +197,7 @@ public class ShareMembershipManagerTest {
@Test
public void testTransitionToFailedWhenTryingToJoin() {
ShareMembershipManager membershipManager = new ShareMembershipManager(
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata,
Optional.empty(), time, rebalanceMetricsManager);
logContext, GROUP_ID, RACK_ID, subscriptionState, metadata, time, rebalanceMetricsManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@ -222,13 +219,13 @@ public class ShareMembershipManagerTest {
MemberStateListener listener = mock(MemberStateListener.class);
membershipManager.registerStateListener(listener);
mockStableMember(membershipManager);
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId);
clearInvocations(listener);
// Transition to FAILED before getting member ID/epoch
membershipManager.transitionToFatal();
assertEquals(MemberState.FATAL, membershipManager.state());
verify(listener).onMemberEpochUpdated(Optional.empty(), Optional.empty());
verify(listener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId);
}
@Test
@ -237,14 +234,14 @@ public class ShareMembershipManagerTest {
MemberStateListener listener = mock(MemberStateListener.class);
membershipManager.registerStateListener(listener);
mockStableMember(membershipManager);
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId);
clearInvocations(listener);
mockLeaveGroup();
membershipManager.leaveGroup();
verify(subscriptionState).unsubscribe();
assertEquals(MemberState.LEAVING, membershipManager.state());
verify(listener).onMemberEpochUpdated(Optional.empty(), Optional.empty());
verify(listener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId);
}
@Test
@ -256,26 +253,26 @@ public class ShareMembershipManagerTest {
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(membershipManager.memberId())
.setMemberEpoch(epoch)));
verify(listener).onMemberEpochUpdated(Optional.of(epoch), Optional.of(MEMBER_ID));
verify(listener).onMemberEpochUpdated(Optional.of(epoch), membershipManager.memberId);
clearInvocations(listener);
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(membershipManager.memberId())
.setMemberEpoch(epoch)));
verify(listener, never()).onMemberEpochUpdated(any(), any());
}
private void mockStableMember(ShareMembershipManager membershipManager) {
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new Assignment());
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new Assignment(),
membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
membershipManager.poll(time.milliseconds());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
}
@ -315,7 +312,7 @@ public class ShareMembershipManagerTest {
assertFalse(sendLeave.isDone(), "Send leave operation should not complete until a response is received");
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData.Assignment()));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData.Assignment(), membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, sendLeave);
}
@ -350,7 +347,8 @@ public class ShareMembershipManagerTest {
assertEquals(toTopicIdPartitionMap(assignment1), membershipManager.currentAssignment().partitions);
// Receive assignment, wait on commit
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment2));
String memberId = membershipManager.memberId();
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment2, memberId));
assertEquals(MemberState.RECONCILING, membershipManager.state());
CompletableFuture<Void> commitResult = new CompletableFuture<>();
membershipManager.poll(time.milliseconds());
@ -369,7 +367,7 @@ public class ShareMembershipManagerTest {
assertTrue(subscriptionState.assignedPartitions().isEmpty());
// We have to reconcile & ack the assignment again
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1, memberId));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -405,7 +403,8 @@ public class ShareMembershipManagerTest {
// Receive assignment - full reconciliation triggered
// stay in RECONCILING state, since an unresolved topic is assigned
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1));
String memberId = membershipManager.memberId();
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1, memberId));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
verifyReconciliationTriggeredAndCompleted(membershipManager,
@ -416,12 +415,12 @@ public class ShareMembershipManagerTest {
clearInvocations(membershipManager);
// Receive extended assignment - assignment received but no reconciliation triggered
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment2));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment2, memberId));
assertEquals(MemberState.RECONCILING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
// Receive original assignment again - full reconciliation not triggered but assignment is acked again
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment1, memberId));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@ -520,7 +519,7 @@ public class ShareMembershipManagerTest {
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
assertFalse(leaveResult.isDone());
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(createAssignment(true)));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, leaveResult);
}
@ -547,7 +546,6 @@ public class ShareMembershipManagerTest {
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state(), "Member should " +
"remain UNSUBSCRIBED after receiving the response to the HB to leave");
assertEquals(-1, membershipManager.memberEpoch());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(sendLeave.isDone(), "Leave group result should complete when the response to" +
" the heartbeat request to leave is received.");
@ -584,11 +582,10 @@ public class ShareMembershipManagerTest {
CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(createAssignment(true)));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId()));
assertEquals(MemberState.LEAVING, membershipManager.state());
assertEquals(-1, membershipManager.memberEpoch());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertTrue(membershipManager.currentAssignment().partitions.isEmpty());
assertFalse(leaveResult.isDone(), "Leave group result should not complete until the " +
"heartbeat request to leave is sent out.");
@ -751,7 +748,7 @@ public class ShareMembershipManagerTest {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
// Updating state with a heartbeat response containing errors cannot be performed and
// should fail.
ShareGroupHeartbeatResponse unknownMemberResponse = createShareGroupHeartbeatResponseWithError();
ShareGroupHeartbeatResponse unknownMemberResponse = createShareGroupHeartbeatResponseWithError(membershipManager.memberId());
assertThrows(IllegalArgumentException.class,
() -> membershipManager.onHeartbeatSuccess(unknownMemberResponse));
}
@ -924,7 +921,7 @@ public class ShareMembershipManagerTest {
// Target assignment received again with the same unresolved topic. Client should keep it
// as unresolved.
clearInvocations(subscriptionState);
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment));
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment, membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(Collections.singleton(topic2), membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState, never()).assignFromSubscribed(anyCollection());
@ -1310,7 +1307,8 @@ public class ShareMembershipManagerTest {
@Test
public void testRebalanceMetricsOnSuccessfulRebalance() {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData.Assignment());
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(
new ShareGroupHeartbeatResponseData.Assignment(), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
@ -1331,7 +1329,8 @@ public class ShareMembershipManagerTest {
@Test
public void testRebalanceMetricsOnFailedRebalance() {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData.Assignment());
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(
new ShareGroupHeartbeatResponseData.Assignment(), membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
Uuid topicId = Uuid.randomUuid();
@ -1473,7 +1472,7 @@ public class ShareMembershipManagerTest {
private ShareMembershipManager mockJoinAndReceiveAssignment(boolean triggerReconciliation,
ShareGroupHeartbeatResponseData.Assignment assignment) {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(assignment);
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(assignment, membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.onHeartbeatSuccess(heartbeatResponse);
@ -1491,7 +1490,7 @@ public class ShareMembershipManagerTest {
private ShareMembershipManager createMemberInStableState() {
ShareMembershipManager membershipManager = createMembershipManagerJoiningGroup();
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new Assignment());
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
@ -1511,7 +1510,7 @@ public class ShareMembershipManagerTest {
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(tp.getKey())
.setPartitions(new ArrayList<>(tp.getValue()))).collect(Collectors.toList()));
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment);
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -1521,7 +1520,7 @@ public class ShareMembershipManagerTest {
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(partitions)));
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment);
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -1529,7 +1528,7 @@ public class ShareMembershipManagerTest {
// New empty assignment received, revoking owned partition.
ShareGroupHeartbeatResponseData.Assignment targetAssignment = new ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.emptyList());
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment);
ShareGroupHeartbeatResponse heartbeatResponse = createShareGroupHeartbeatResponse(targetAssignment, membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@ -1542,7 +1541,6 @@ public class ShareMembershipManagerTest {
membershipManager.transitionToFenced();
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(0, membershipManager.memberEpoch());
assertEquals(MemberState.JOINING, membershipManager.state());
}
@ -1564,7 +1562,6 @@ public class ShareMembershipManagerTest {
"heartbeat request to leave is sent out.");
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager, leaveResult);
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(-1, membershipManager.memberEpoch());
assertTrue(membershipManager.currentAssignment().isNone());
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
@ -1603,21 +1600,26 @@ public class ShareMembershipManagerTest {
}
private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponse(
ShareGroupHeartbeatResponseData.Assignment assignment) {
ShareGroupHeartbeatResponseData.Assignment assignment, String memberId) {
return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(MEMBER_EPOCH)
.setAssignment(assignment));
}
private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponseWithError() {
private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponseWithError(String memberId) {
return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setMemberId(MEMBER_ID)
.setMemberId(memberId)
.setMemberEpoch(5));
}
private void assertMemberIdIsGenerated(String originalMemberId) {
assertNotNull(originalMemberId, "Member Id should be generated at startup");
assertFalse(originalMemberId.isEmpty(), "Member Id should be generated at startup");
}
private ShareGroupHeartbeatResponseData.Assignment createAssignment(boolean mockMetadata) {
Uuid topic1 = Uuid.randomUuid();
Uuid topic2 = Uuid.randomUuid();

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.{ConsumerGroupState, Uuid}
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -102,6 +102,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty

View File

@ -16,39 +16,43 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterTest, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.extension.ExtendWith
import scala.collection.Map
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic")
)
)
def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
new ConsumerGroupHeartbeatRequestData(),
true
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
@ -57,18 +61,14 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 0)
)
)
def testConsumerGroupHeartbeatIsInaccessibleWhenFeatureFlagNotEnabled(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
new ConsumerGroupHeartbeatRequestData(),
true
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
@ -76,13 +76,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
@ -100,10 +94,12 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
@ -131,7 +127,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
@ -157,7 +154,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-1)
.setMemberEpoch(-1),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
@ -166,13 +164,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
@ -190,11 +182,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
@ -223,7 +217,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
@ -253,7 +248,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-2)
.setMemberEpoch(-2),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
@ -265,11 +261,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
@ -283,10 +281,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5001"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "5001")
)
@ -309,11 +304,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
@ -342,7 +339,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
@ -367,11 +365,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException.
@ -393,10 +393,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "5000")
)
)
@ -420,11 +417,13 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(consumerGroupId)
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
@ -454,7 +453,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setGroupId(consumerGroupId)
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// Verify the response. The heartbeat interval was updated.
@ -465,6 +465,70 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}, msg = s"Dynamic update consumer group config failed. Last response $consumerGroupHeartbeatResponse.")
}
@ClusterTest
def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = "Should fail due to invalid member id.")
}
@ClusterTest
def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build(0)
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
val memberId = consumerGroupHeartbeatResponse.data().memberId()
assertNotNull(memberId)
assertFalse(memberId.isEmpty)
}
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
request,

View File

@ -18,7 +18,7 @@ package kafka.server
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{JoinGroupResponseData, ListGroupsResponseData, OffsetFetchResponseData, SyncGroupResponseData}
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
@ -245,6 +245,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The joining request with a consumer group member 2 is accepted.
val memberId2 = consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
@ -309,7 +310,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
val groupId = "grp"
// Consumer member 1 joins the group.
val (memberId1, _) = joinConsumerGroupWithNewProtocol(groupId)
val (memberId1, _) = joinConsumerGroupWithNewProtocol(groupId, Uuid.randomUuid.toString)
// Classic member 2 joins the group.
val joinGroupResponseData = sendJoinRequest(
@ -384,6 +385,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The consumerGroupHeartbeat request is rejected.
consumerGroupHeartbeat(
groupId = groupId,
memberId = memberId1,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
@ -421,6 +423,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The consumerGroupHeartbeat request is rejected.
consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
@ -449,7 +452,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
val groupId = "grp"
// Consumer member 1 joins the group.
val (memberId1, _) = joinConsumerGroupWithNewProtocol(groupId)
val (memberId1, _) = joinConsumerGroupWithNewProtocol(groupId, Uuid.randomUuid.toString)
// Classic member 2 joins the group.
val joinGroupResponseData = sendJoinRequest(
@ -560,7 +563,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// Create a consumer group by joining a member.
val groupId = "grp"
val (memberId, _) = joinConsumerGroupWithNewProtocol(groupId)
val (memberId, _) = joinConsumerGroupWithNewProtocol(groupId, Uuid.randomUuid.toString)
// The member leaves the group.
leaveGroup(
@ -646,6 +649,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The joining request with a consumer group member is accepted.
consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List(topicName),
topicPartitions = List.empty,
@ -714,6 +718,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The joining request with a consumer group member 2 is accepted.
val memberId2 = consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
instanceId = if (useStaticMembers) instanceId2 else null,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
@ -1044,6 +1049,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// The joining request with a consumer group member 2 is accepted.
val memberId2 = consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
instanceId = if (useStaticMembers) instanceId2 else null,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),

View File

@ -463,9 +463,10 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
(joinGroupResponseData.memberId, joinGroupResponseData.generationId)
}
protected def joinConsumerGroupWithNewProtocol(groupId: String): (String, Int) = {
protected def joinConsumerGroupWithNewProtocol(groupId: String, memberId: String = ""): (String, Int) = {
val consumerGroupHeartbeatResponseData = consumerGroupHeartbeat(
groupId = groupId,
memberId = memberId,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
@ -477,7 +478,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
if (useNewProtocol) {
// Note that we heartbeat only once to join the group and assume
// that the test will complete within the session timeout.
joinConsumerGroupWithNewProtocol(groupId)
joinConsumerGroupWithNewProtocol(groupId, Uuid.randomUuid().toString)
} else {
// Note that we don't heartbeat and assume that the test will
// complete within the session timeout.
@ -577,7 +578,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
serverAssignor: String = null,
subscribedTopicNames: List[String] = null,
topicPartitions: List[ConsumerGroupHeartbeatRequestData.TopicPartitions] = null,
expectedError: Errors = Errors.NONE
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): ConsumerGroupHeartbeatResponseData = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
@ -591,7 +593,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
.setServerAssignor(serverAssignor)
.setTopicPartitions(topicPartitions.asJava),
true
).build()
).build(version)
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
@ -606,7 +608,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected def shareGroupHeartbeat(
groupId: String,
memberId: String = "",
memberId: String = Uuid.randomUuid.toString,
memberEpoch: Int = 0,
rackId: String = null,
subscribedTopicNames: List[String] = null,

View File

@ -21,7 +21,7 @@ import org.apache.kafka.common.test.api.RaftClusterInvocationContext.RaftCluster
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitForAllPartitionsMetadata
import org.apache.kafka.clients.admin.{Admin, NewPartitions}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse}
@ -77,6 +77,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true
@ -169,6 +170,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true
@ -188,6 +190,16 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), shareGroupHeartbeatResponse.data.assignment)
// The second member request to join the group.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true
).build()
// Send the second member request until receiving a successful response.
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
@ -302,6 +314,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true
@ -412,6 +425,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo", "bar", "baz").asJava),
true
@ -603,6 +617,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true
@ -770,6 +785,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava),
true

View File

@ -141,6 +141,7 @@ import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
@ -1293,21 +1294,27 @@ public class GroupMetadataManager {
* Validates the request.
*
* @param request The request to validate.
*
* @param apiVersion The version of ConsumerGroupHeartbeat RPC
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
ConsumerGroupHeartbeatRequestData request
ConsumerGroupHeartbeatRequestData request,
short apiVersion
) throws InvalidRequestException, UnsupportedAssignorException {
if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
request.memberEpoch() > 0 ||
request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH
) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
}
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
} else if (request.memberEpoch() == 0) {
if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) {
throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
}
@ -1318,9 +1325,8 @@ public class GroupMetadataManager {
throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
}
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
throwIfNull(request.instanceId(), "InstanceId can't be null.");
} else {
} else if (request.memberEpoch() < LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
throw new InvalidRequestException("MemberEpoch is invalid.");
}
@ -1335,23 +1341,21 @@ public class GroupMetadataManager {
* Validates the ShareGroupHeartbeat request.
*
* @param request The request to validate.
*
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfShareGroupHeartbeatRequestIsInvalid(
ShareGroupHeartbeatRequestData request
) throws InvalidRequestException, UnsupportedAssignorException {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
if (request.memberEpoch() > 0 || request.memberEpoch() == ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
} else if (request.memberEpoch() == 0) {
if (request.memberEpoch() == 0) {
if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
}
} else {
} else if (request.memberEpoch() < ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
throw new InvalidRequestException("MemberEpoch is invalid.");
}
}
@ -3199,7 +3203,7 @@ public class GroupMetadataManager {
RequestContext context,
ConsumerGroupHeartbeatRequestData request
) throws ApiException {
throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.apiVersion());
if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.

View File

@ -59,6 +59,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
@ -164,25 +165,34 @@ public class GroupMetadataManagerTest {
@Test
public void testConsumerHeartbeatRequestValidation() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
String memberId = Uuid.randomUuid().toString();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.build();
Exception ex;
// GroupId must be present in all requests.
// MemberId must be present in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()));
assertEquals("MemberId can't be empty.", ex.getMessage());
// GroupId must be present in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)));
assertEquals("GroupId can't be empty.", ex.getMessage());
// GroupId can't be all whitespaces.
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId(" ")));
assertEquals("GroupId can't be empty.", ex.getMessage());
// RebalanceTimeoutMs must be present in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId("foo")
.setMemberEpoch(0)));
assertEquals("RebalanceTimeoutMs must be provided in first request.", ex.getMessage());
@ -190,6 +200,7 @@ public class GroupMetadataManagerTest {
// TopicPartitions must be present and empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId("foo")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)));
@ -198,25 +209,18 @@ public class GroupMetadataManagerTest {
// SubscribedTopicNames must be present and empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId("foo")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setTopicPartitions(Collections.emptyList())));
assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
// MemberId must be non-empty in all requests except for the first one where it
// could be empty (epoch != 0).
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(1)));
assertEquals("MemberId can't be empty.", ex.getMessage());
// InstanceId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberId(memberId)
.setMemberEpoch(1)
.setInstanceId("")));
assertEquals("InstanceId can't be empty.", ex.getMessage());
@ -225,7 +229,7 @@ public class GroupMetadataManagerTest {
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberId(memberId)
.setMemberEpoch(1)
.setRackId("")));
assertEquals("RackId can't be empty.", ex.getMessage());
@ -234,7 +238,7 @@ public class GroupMetadataManagerTest {
ex = assertThrows(UnsupportedAssignorException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberId(memberId)
.setMemberEpoch(1)
.setServerAssignor("bar")));
assertEquals("ServerAssignor bar is not supported. Supported assignors: range.", ex.getMessage());
@ -242,7 +246,7 @@ public class GroupMetadataManagerTest {
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberId(memberId)
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
@ -261,6 +265,7 @@ public class GroupMetadataManagerTest {
// Regex not supported for now. This test will evolve to actually validate the regex when it's supported
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(Uuid.randomUuid().toString())
.setGroupId("foo")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
@ -281,13 +286,17 @@ public class GroupMetadataManagerTest {
));
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
// The consumer generates its own Member ID starting from version 1 of the ConsumerGroupHeartbeat RPC.
// Therefore, this test case is specific to earlier versions of the RPC.
new ConsumerGroupHeartbeatRequestData()
.setGroupId("group-foo")
.setMemberEpoch(0)
.setServerAssignor("range")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList()));
.setTopicPartitions(Collections.emptyList()),
(short) 0
);
// Verify that a member id was generated for the new member.
String memberId = result.response().memberId();
@ -10177,15 +10186,19 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments, metadataImage.features().metadataVersion()));
context.commit();
// The static member rejoins with new protocol, triggering the upgrade.
// The static member rejoins with new protocol after a restart, triggering the upgrade.
String newMemberId = Uuid.randomUuid().toString();
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(newMemberId)
.setInstanceId(instanceId)
.setRebalanceTimeoutMs(5000)
.setServerAssignor(NoOpPartitionAssignor.NAME)
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
.setTopicPartitions(Collections.emptyList()));
.setTopicPartitions(Collections.emptyList()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId)
.setInstanceId(instanceId)
@ -10204,7 +10217,14 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0)))
.build();
String newMemberId = result.response().memberId();
// The memberId is generated by the consumer and should be retained
// for the entire lifetime of the process until termination.
assertEquals(
newMemberId,
result.response().memberId(),
"Server should not generate a new memberId since the consumer has already generated its own."
);
ConsumerGroupMember expectedReplacingConsumerMember = new ConsumerGroupMember.Builder(newMemberId)
.setInstanceId(instanceId)
.setMemberEpoch(0)
@ -10250,11 +10270,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember),
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
// Newly joining static member bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
@ -10367,14 +10383,25 @@ public class GroupMetadataManagerTest {
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setInstanceId(instanceId1)
.setRebalanceTimeoutMs(5000)
.setServerAssignor(NoOpPartitionAssignor.NAME)
.setSubscribedTopicNames(new ArrayList<>(member1.subscribedTopicNames()))
.setTopicPartitions(Collections.emptyList()));
.setTopicPartitions(Collections.emptyList()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
String newMemberId1 = result.response().memberId();
ConsumerGroupMember expectedReplacingConsumerMember = new ConsumerGroupMember.Builder(newMemberId1)
// The memberId is generated by the consumer itself, the consumer should retain this memberId
// for its entire lifetime until the process terminates.
assertEquals(
memberId1,
result.response().memberId(),
"Server should not generate a new memberId since the consumer has already generated its own."
);
ConsumerGroupMember expectedReplacingConsumerMember = new ConsumerGroupMember.Builder(memberId1)
.setInstanceId(instanceId1)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
@ -10402,7 +10429,7 @@ public class GroupMetadataManagerTest {
// Create the new static member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedReplacingConsumerMember),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId1, member1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, member1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember),
// The static member rejoins the new consumer group.
@ -10414,7 +10441,7 @@ public class GroupMetadataManagerTest {
);
assertRecordsEquals(expectedRecords, result.records());
context.assertSessionTimeout(groupId, newMemberId1, 45000);
context.assertSessionTimeout(groupId, memberId1, 45000);
}
@Test
@ -13969,20 +13996,29 @@ public class GroupMetadataManagerTest {
.build();
Exception ex;
// GroupId must be present in all requests.
// MemberId must be present in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()));
new ShareGroupHeartbeatRequestData()));
assertEquals("MemberId can't be empty.", ex.getMessage());
// GroupId must be present in all requests.
String memberId = Uuid.randomUuid().toString();
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setMemberId(memberId)));
assertEquals("GroupId can't be empty.", ex.getMessage());
// GroupId can't be all whitespaces.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId(" ")));
assertEquals("GroupId can't be empty.", ex.getMessage());
// SubscribedTopicNames must be present and empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId("foo")
.setMemberEpoch(0)));
assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
@ -13998,8 +14034,8 @@ public class GroupMetadataManagerTest {
// RackId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1)
.setRackId("")));
assertEquals("RackId can't be empty.", ex.getMessage());
@ -14037,6 +14073,7 @@ public class GroupMetadataManagerTest {
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupIds.get(1))
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(Collections.singletonList("foo")));
@ -14086,16 +14123,19 @@ public class GroupMetadataManagerTest {
Collections.emptyMap()
));
String memberId = Uuid.randomUuid().toString();
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("group-foo")
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
// Verify that a member id was generated for the new member.
String memberId = result.response().memberId();
assertNotNull(memberId);
assertNotEquals("", memberId);
assertEquals(
memberId,
result.response().memberId(),
"MemberId should remain unchanged, as the server does not generate a new one since the consumer generates its own."
);
// The response should get a bumped epoch and should not
// contain any assignment because we did not provide
@ -14796,6 +14836,7 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
.setGroupId(groupId)
.setMemberEpoch(0)
.setServerAssignor("range")

View File

@ -601,11 +601,18 @@ public class GroupMetadataManagerTestContext {
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
) {
return this.consumerGroupHeartbeat(request, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion());
}
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request,
short apiVersion
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.CONSUMER_GROUP_HEARTBEAT,
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(),
apiVersion,
DEFAULT_CLIENT_ID,
0
),