KAFKA-13200: Fix MirrorMaker2 connector version (#11212)

Use the Kafka version instead of hardcoding it to 1.

Reviewers: Tom Bentley <tbentley@redhat.com>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Mickael Maison 2021-11-29 16:37:51 +01:00 committed by GitHub
parent b6e7f6a4df
commit a989149731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 9 additions and 6 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
@ -118,7 +119,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
@Override
public String version() {
return "1";
return AppInfoParser.getVersion();
}
private void refreshConsumerGroups()

View File

@ -124,7 +124,7 @@ public class MirrorCheckpointTask extends SourceTask {
@Override
public String version() {
return "1";
return new MirrorCheckpointConnector().version();
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
@ -75,7 +76,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override
public String version() {
return "1";
return AppInfoParser.getVersion();
}
private void createInternalTopics() {

View File

@ -58,7 +58,7 @@ public class MirrorHeartbeatTask extends SourceTask {
@Override
public String version() {
return "1";
return new MirrorHeartbeatConnector().version();
}
@Override

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
@ -185,7 +186,7 @@ public class MirrorSourceConnector extends SourceConnector {
@Override
public String version() {
return "1";
return AppInfoParser.getVersion();
}
// visible for testing

View File

@ -126,7 +126,7 @@ public class MirrorSourceTask extends SourceTask {
@Override
public String version() {
return "1";
return new MirrorSourceConnector().version();
}
@Override