From a9891497314339057bbcfbd43c7715db8d16500e Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 29 Nov 2021 16:37:51 +0100 Subject: [PATCH] KAFKA-13200: Fix MirrorMaker2 connector version (#11212) Use the Kafka version instead of hardcoding it to 1. Reviewers: Tom Bentley , Luke Chen --- .../apache/kafka/connect/mirror/MirrorCheckpointConnector.java | 3 ++- .../org/apache/kafka/connect/mirror/MirrorCheckpointTask.java | 2 +- .../apache/kafka/connect/mirror/MirrorHeartbeatConnector.java | 3 ++- .../org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java | 2 +- .../org/apache/kafka/connect/mirror/MirrorSourceConnector.java | 3 ++- .../java/org/apache/kafka/connect/mirror/MirrorSourceTask.java | 2 +- 6 files changed, 9 insertions(+), 6 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 5118ee18186..a6bfee0f19b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; @@ -118,7 +119,7 @@ public class MirrorCheckpointConnector extends SourceConnector { @Override public String version() { - return "1"; + return AppInfoParser.getVersion(); } private void refreshConsumerGroups() diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 09eb0fd3408..47631998fbb 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -124,7 +124,7 @@ public class MirrorCheckpointTask extends SourceTask { @Override public String version() { - return "1"; + return new MirrorCheckpointConnector().version(); } @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java index 8b2d064f5aa..a4323ba706b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import java.util.Map; @@ -75,7 +76,7 @@ public class MirrorHeartbeatConnector extends SourceConnector { @Override public String version() { - return "1"; + return AppInfoParser.getVersion(); } private void createInternalTopics() { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java index 9f38b5999ea..754dee97235 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java @@ -58,7 +58,7 @@ public class MirrorHeartbeatTask extends SourceTask { @Override public String version() { - return "1"; + return new MirrorHeartbeatConnector().version(); } @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index ee6733033d2..ba2329e484b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; @@ -185,7 +186,7 @@ public class MirrorSourceConnector extends SourceConnector { @Override public String version() { - return "1"; + return AppInfoParser.getVersion(); } // visible for testing diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index fb5c844417e..ec1e15cda7e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -126,7 +126,7 @@ public class MirrorSourceTask extends SourceTask { @Override public String version() { - return "1"; + return new MirrorSourceConnector().version(); } @Override