From 92c928752ab6abcf47b3ce7fa59a95061876b7ef Mon Sep 17 00:00:00 2001 From: cainbit Date: Wed, 30 Oct 2024 13:42:50 +0800 Subject: [PATCH 1/2] Add private key authentication support to SFTP Add privateKey and keyPassword configurations. --- ftpreader/doc/ftpreader.md | 22 +++++++++++++-- .../plugin/reader/ftpreader/FtpHelper.java | 3 ++- .../plugin/reader/ftpreader/FtpReader.java | 26 +++++++++++++++--- .../datax/plugin/reader/ftpreader/Key.java | 2 ++ .../plugin/reader/ftpreader/SftpHelper.java | 21 ++++++++++++--- .../reader/ftpreader/StandardFtpHelper.java | 3 ++- ftpwriter/doc/ftpwriter.md | 22 +++++++++++++-- .../plugin/writer/ftpwriter/FtpWriter.java | 27 ++++++++++++++++--- .../datax/plugin/writer/ftpwriter/Key.java | 4 +++ .../writer/ftpwriter/util/IFtpHelper.java | 3 ++- .../writer/ftpwriter/util/SftpHelperImpl.java | 17 ++++++++++-- .../ftpwriter/util/StandardFtpHelperImpl.java | 3 ++- 12 files changed, 133 insertions(+), 20 deletions(-) diff --git a/ftpreader/doc/ftpreader.md b/ftpreader/doc/ftpreader.md index 770c6a9c..5c49407d 100644 --- a/ftpreader/doc/ftpreader.md +++ b/ftpreader/doc/ftpreader.md @@ -57,6 +57,8 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能, "port": 22, "username": "xx", "password": "xxx", + "privateKey": "/path_to_private_key", + "keyPassword": "xxx", "path": [ "/home/hanfa.shf/ftpReaderTest/data" ], @@ -153,10 +155,26 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能, * **password** - * 描述:ftp服务器访问密码。
+ * 描述:ftp服务器访问密码。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
- * 必选:是
+* **privateKey** + * 描述:ftp服务器访问私钥。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
+ +* **keyPassword** + + * 描述:私钥密码密码。
+ + * 必选:否
+ * 默认值:无
* **path** diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java index f8b3f56f..a75a3189 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.ftpreader; import java.io.InputStream; import java.util.HashSet; import java.util.List; +import java.util.Map; public abstract class FtpHelper { /** @@ -18,7 +19,7 @@ public abstract class FtpHelper { * @return void * @throws */ - public abstract void loginFtpServer(String host, String username, String password, int port, int timeout,String connectMode) ; + public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode, Map extendParams) ; /** * * @Title: LogoutFtpServer diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java index c1f20dfd..94f4d99d 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java @@ -2,9 +2,12 @@ package com.alibaba.datax.plugin.reader.ftpreader; import java.io.InputStream; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,8 @@ public class FtpReader extends Reader { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private String connectPattern; private int maxTraversalLevel; @@ -44,6 +49,9 @@ public class FtpReader extends Reader { this.validateParameter(); UnstructuredStorageReaderUtil.validateParameter(this.originConfig); + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); if ("sftp".equals(protocol)) { //sftp协议 this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT); @@ -53,7 +61,7 @@ public class FtpReader extends Reader { this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_FTP_PORT); this.ftpHelper = new StandardFtpHelper(); } - ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern); + ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams); } @@ -67,7 +75,12 @@ public class FtpReader extends Reader { } this.host = this.originConfig.getNecessaryValue(Key.HOST, FtpReaderErrorCode.REQUIRED_VALUE); this.username = this.originConfig.getNecessaryValue(Key.USERNAME, FtpReaderErrorCode.REQUIRED_VALUE); - this.password = this.originConfig.getNecessaryValue(Key.PASSWORD, FtpReaderErrorCode.REQUIRED_VALUE); + this.password = this.originConfig.getString(Key.PASSWORD); + this.privateKey = this.originConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.originConfig.getString(Key.KEYPASSWORD); + if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) { + throw DataXException.asDataXException(FtpReaderErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项"); + } this.timeout = originConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.maxTraversalLevel = originConfig.getInt(Key.MAXTRAVERSALLEVEL, Constant.DEFAULT_MAX_TRAVERSAL_LEVEL); @@ -175,6 +188,8 @@ public class FtpReader extends Reader { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private String protocol; private int timeout; private String connectPattern; @@ -192,10 +207,15 @@ public class FtpReader extends Reader { this.protocol = readerSliceConfig.getString(Key.PROTOCOL); this.username = readerSliceConfig.getString(Key.USERNAME); this.password = readerSliceConfig.getString(Key.PASSWORD); + this.privateKey = readerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = readerSliceConfig.getString(Key.KEYPASSWORD); this.timeout = readerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.sourceFiles = this.readerSliceConfig.getList(Constant.SOURCE_FILES, String.class); + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); if ("sftp".equals(protocol)) { //sftp协议 this.port = readerSliceConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT); @@ -206,7 +226,7 @@ public class FtpReader extends Reader { this.connectPattern = readerSliceConfig.getString(Key.CONNECTPATTERN, Constant.DEFAULT_FTP_CONNECT_PATTERN);// 默认为被动模式 this.ftpHelper = new StandardFtpHelper(); } - ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern); + ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams); } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java index cdbd043c..5b7a36aa 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java @@ -5,6 +5,8 @@ public class Key { public static final String HOST = "host"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; + public static final String PRIVATEKEY = "privateKey"; + public static final String KEYPASSWORD = "keyPassword"; public static final String PORT = "port"; public static final String TIMEOUT = "timeout"; public static final String CONNECTPATTERN = "connectPattern"; diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java index 6e42e10c..be9f8350 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java @@ -2,10 +2,13 @@ package com.alibaba.datax.plugin.reader.ftpreader; import java.io.InputStream; import java.util.HashSet; +import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Vector; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,11 +27,21 @@ public class SftpHelper extends FtpHelper { Session session = null; ChannelSftp channelSftp = null; - @Override + + @Override public void loginFtpServer(String host, String username, String password, int port, int timeout, - String connectMode) { + String connectMode, Map extendParams) { JSch jsch = new JSch(); // 创建JSch对象 try { + String privateKey = (String)extendParams.get(Key.PRIVATEKEY); + String keyPassword = (String)extendParams.get(Key.KEYPASSWORD); + if (!StringUtils.isBlank(privateKey)) { + if (!StringUtils.isBlank(keyPassword)) { + jsch.addIdentity(privateKey, keyPassword); + } else { + jsch.addIdentity(privateKey); + } + } session = jsch.getSession(username, host, port); // 根据用户名,主机ip,端口获取一个Session对象 // 如果服务器连接不上,则抛出异常 @@ -37,7 +50,9 @@ public class SftpHelper extends FtpHelper { "session is null,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确."); } - session.setPassword(password); // 设置密码 + if (StringUtils.isBlank(privateKey)) { + session.setPassword(password); // 设置密码 + } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index 79b23f8b..d2683801 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.UnknownHostException; import java.util.HashSet; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.net.ftp.FTP; @@ -23,7 +24,7 @@ public class StandardFtpHelper extends FtpHelper { @Override public void loginFtpServer(String host, String username, String password, int port, int timeout, - String connectMode) { + String connectMode, Map extendParams) { ftpClient = new FTPClient(); try { // 连接 diff --git a/ftpwriter/doc/ftpwriter.md b/ftpwriter/doc/ftpwriter.md index a38a1052..e4ecde23 100644 --- a/ftpwriter/doc/ftpwriter.md +++ b/ftpwriter/doc/ftpwriter.md @@ -53,6 +53,8 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结 "port": 22, "username": "xxx", "password": "xxx", + "privateKey": "/path_to_private_key", + "keyPassword": "xxx", "timeout": "60000", "connectPattern": "PASV", "path": "/tmp/data/", @@ -117,9 +119,25 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结 * **password** - * 描述:ftp服务器访问密码。
+ * 描述:ftp服务器访问密码。密码和私钥必须配置一项。
- * 必选:是
+ * 必选:否
+ + * 默认值:无
+ +* **privateKey** + + * 描述:ftp服务器访问私钥。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
+ +* **keyPassword** + + * 描述:私钥密码密码。
+ + * 必选:否
* 默认值:无
diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java index eda603fc..94cd3163 100755 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java @@ -18,7 +18,9 @@ import org.slf4j.LoggerFactory; import java.io.OutputStream; import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -34,6 +36,8 @@ public class FtpWriter extends Writer { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private IFtpHelper ftpHelper = null; @@ -48,8 +52,11 @@ public class FtpWriter extends Writer { RetryUtil.executeWithRetry(new Callable() { @Override public Void call() throws Exception { + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); ftpHelper.loginFtpServer(host, username, password, - port, timeout); + port, timeout, extendParams); return null; } }, 3, 4000, true); @@ -81,8 +88,13 @@ public class FtpWriter extends Writer { FtpWriterErrorCode.REQUIRED_VALUE); this.username = this.writerSliceConfig.getNecessaryValue( Key.USERNAME, FtpWriterErrorCode.REQUIRED_VALUE); - this.password = this.writerSliceConfig.getNecessaryValue( - Key.PASSWORD, FtpWriterErrorCode.REQUIRED_VALUE); + this.password = this.writerSliceConfig.getString(Key.PASSWORD); + this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD); + if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) { + throw DataXException.asDataXException( + FtpWriterErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项"); + } this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); @@ -208,6 +220,8 @@ public class FtpWriter extends Writer { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private IFtpHelper ftpHelper = null; @@ -225,6 +239,8 @@ public class FtpWriter extends Writer { this.port = this.writerSliceConfig.getInt(Key.PORT); this.username = this.writerSliceConfig.getString(Key.USERNAME); this.password = this.writerSliceConfig.getString(Key.PASSWORD); + this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD); this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.protocol = this.writerSliceConfig.getString(Key.PROTOCOL); @@ -238,8 +254,11 @@ public class FtpWriter extends Writer { RetryUtil.executeWithRetry(new Callable() { @Override public Void call() throws Exception { + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); ftpHelper.loginFtpServer(host, username, password, - port, timeout); + port, timeout, extendParams); return null; } }, 3, 4000, true); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java index 1cf4812a..b0becd76 100755 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java @@ -8,6 +8,10 @@ public class Key { public static final String USERNAME = "username"; public static final String PASSWORD = "password"; + + public static final String PRIVATEKEY = "privateKey"; + + public static final String KEYPASSWORD = "keyPassword"; public static final String PORT = "port"; diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java index 2e503f7f..fa94871b 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java @@ -1,12 +1,13 @@ package com.alibaba.datax.plugin.writer.ftpwriter.util; import java.io.OutputStream; +import java.util.Map; import java.util.Set; public interface IFtpHelper { //使用被动方式 - public void loginFtpServer(String host, String username, String password, int port, int timeout); + public void loginFtpServer(String host, String username, String password, int port, int timeout, Map extendParams); public void logoutFtpServer(); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java index e748f12c..e806cb64 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.writer.ftpwriter.util; import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; @@ -13,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.writer.ftpwriter.Key; import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONWriter; @@ -33,9 +35,18 @@ public class SftpHelperImpl implements IFtpHelper { @Override public void loginFtpServer(String host, String username, String password, - int port, int timeout) { + int port, int timeout, Map extendParams) { JSch jsch = new JSch(); try { + String privateKey = (String)extendParams.get(Key.PRIVATEKEY); + String keyPassword = (String)extendParams.get(Key.KEYPASSWORD); + if (!StringUtils.isBlank(privateKey)) { + if (!StringUtils.isBlank(keyPassword)) { + jsch.addIdentity(privateKey, keyPassword); + } else { + jsch.addIdentity(privateKey); + } + } this.session = jsch.getSession(username, host, port); if (this.session == null) { throw DataXException @@ -43,7 +54,9 @@ public class SftpHelperImpl implements IFtpHelper { "创建ftp连接this.session失败,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确."); } - this.session.setPassword(password); + if (StringUtils.isBlank(privateKey)) { + this.session.setPassword(password); + } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); // config.put("PreferredAuthentications", "password"); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java index d5b9a746..eed5a21f 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.UnknownHostException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; @@ -28,7 +29,7 @@ public class StandardFtpHelperImpl implements IFtpHelper { @Override public void loginFtpServer(String host, String username, String password, - int port, int timeout) { + int port, int timeout, Map extendParams) { this.ftpClient = new FTPClient(); try { this.ftpClient.setControlEncoding("UTF-8"); From 2912186db6b27ea156fcfb11fd14e534dd68bfbd Mon Sep 17 00:00:00 2001 From: cainbit Date: Wed, 30 Oct 2024 15:28:27 +0800 Subject: [PATCH 2/2] Delete useless imports and modify typo. --- .../com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java | 1 - .../plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java index be9f8350..4c9b6c16 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java @@ -3,7 +3,6 @@ package com.alibaba.datax.plugin.reader.ftpreader; import java.io.InputStream; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.Vector; diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java index eed5a21f..7bd0ded8 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java @@ -29,7 +29,7 @@ public class StandardFtpHelperImpl implements IFtpHelper { @Override public void loginFtpServer(String host, String username, String password, - int port, int timeout, Map extendParams) { + int port, int timeout, Map extendParams) { this.ftpClient = new FTPClient(); try { this.ftpClient.setControlEncoding("UTF-8");