KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432)

We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.

Reviewers: Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
Mickael Maison 2022-08-01 12:59:41 +02:00 committed by GitHub
parent f7ac5d3d00
commit 1cc1e776f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 3 deletions

View File

@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
// Member level errors.
case UNKNOWN_MEMBER_ID:
log.debug("OffsetCommit request for group id {} failed due to error {}.",
groupId.idValue, error);
partitionResults.put(topicPartition, error);

View File

@ -17,9 +17,11 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask {
void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
consumerGroupId, offsetToSync.size());
AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
result.all().whenComplete((v, throwable) -> {
if (throwable != null) {
if (throwable.getCause() instanceof UnknownMemberIdException) {
log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", consumerGroupId);
} else {
log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable);
}
} else {
log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId);
}
});
}
}