mirror of https://github.com/apache/kafka.git
MINOR: Fixing logs and adding exception in response (#19859)
PR streamlines the logs when delete share group or offset is triggered. Also fixes the response when group is not found while deleting share group. Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
This commit is contained in:
parent
223684bad1
commit
a70a667e95
|
@ -1329,6 +1329,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
});
|
||||
|
||||
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
|
||||
// Since the specific group types are not known, group deletion operations are chained.
|
||||
// The sequence of these deletions is important: the initial share group deletion should
|
||||
// not error if the group ID isn't found, whereas the subsequent consumer group deletion
|
||||
// (the final operation) must return an error if its corresponding group ID is not found.
|
||||
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> {
|
||||
DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults = new DeleteGroupsResponseData.DeletableGroupResultCollection();
|
||||
List<String> retainedGroupIds = updateResponseAndGetNonErrorGroupList(groupErrMap, groupList, deletableGroupResults);
|
||||
|
@ -1390,7 +1394,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
Set<String> groupSet = new HashSet<>(groupList);
|
||||
// Remove all share group ids which have errored out
|
||||
// when deleting with persister.
|
||||
groupSet.removeAll(errGroupIds);
|
||||
errGroupIds.forEach(groupSet::remove);
|
||||
|
||||
// Let us invoke the standard procedure of any non-share
|
||||
// groups or successfully deleted share groups remaining.
|
||||
|
|
|
@ -648,10 +648,11 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
|
||||
/**
|
||||
* Method returns a Map keyed on groupId and value as pair of {@link DeleteShareGroupStateParameters}
|
||||
* and any ERRORS while building the request corresponding
|
||||
* to the valid share groups passed as the input.
|
||||
* and any ERRORS while building the request corresponding to the valid share groups passed as the input.
|
||||
* <p>
|
||||
* The groupIds are first filtered by type to restrict the list to share groups.
|
||||
* The groupIds are first filtered by type to restrict the list to share groups. If a group isn't
|
||||
* found or isn't a share group, it won't trigger an error in the response since group deletions
|
||||
* are chained. Instead, that group should be retried against other group types.
|
||||
* @param groupIds - A list of groupIds as string
|
||||
* @return A result object containing a map keyed on groupId and value pair (req, error) and related coordinator records.
|
||||
*/
|
||||
|
@ -667,9 +668,11 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records)
|
||||
.ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE)));
|
||||
} catch (GroupIdNotFoundException exception) {
|
||||
log.debug("GroupId {} not found as a share group.", groupId);
|
||||
log.debug("Unable to delete share group. GroupId {} not found.", groupId);
|
||||
// Do not include the error in response map, as the deletion of groups is chained hence
|
||||
// the respective group should be re-tried for deletion against other group types.
|
||||
} catch (GroupNotEmptyException exception) {
|
||||
log.debug("Share group {} is not empty.", groupId);
|
||||
log.debug("Unable to delete share group. Provided group {} is not empty.", groupId);
|
||||
responseMap.put(groupId, Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(exception)));
|
||||
}
|
||||
}
|
||||
|
@ -729,13 +732,13 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
);
|
||||
|
||||
} catch (GroupIdNotFoundException exception) {
|
||||
log.error("groupId {} not found", groupId, exception);
|
||||
log.debug("Unable to delete share group offsets. GroupId {} not found.", groupId);
|
||||
return new CoordinatorResult<>(
|
||||
records,
|
||||
new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage())
|
||||
);
|
||||
} catch (GroupNotEmptyException exception) {
|
||||
log.error("Provided group {} is not empty", groupId);
|
||||
log.debug("Unable to delete share group offsets. Provided group {} is not empty.", groupId);
|
||||
return new CoordinatorResult<>(
|
||||
records,
|
||||
new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage())
|
||||
|
|
|
@ -3770,8 +3770,13 @@ public class GroupMetadataManager {
|
|||
assignmentResultBuilder.build();
|
||||
long assignorTimeMs = time.milliseconds() - startTimeMs;
|
||||
|
||||
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.",
|
||||
group.groupId(), groupEpoch, shareGroupAssignor, assignorTimeMs, assignmentResult.targetAssignment());
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.",
|
||||
group.groupId(), groupEpoch, shareGroupAssignor, assignorTimeMs, assignmentResult.targetAssignment());
|
||||
} else {
|
||||
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms.",
|
||||
group.groupId(), groupEpoch, shareGroupAssignor, assignorTimeMs);
|
||||
}
|
||||
|
||||
records.addAll(assignmentResult.records());
|
||||
|
||||
|
|
Loading…
Reference in New Issue