This commit is contained in:
cainbit 2025-07-01 18:42:53 +08:00 committed by GitHub
commit 96edc4fc0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 132 additions and 20 deletions

View File

@ -57,6 +57,8 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能
"port": 22,
"username": "xx",
"password": "xxx",
"privateKey": "/path_to_private_key",
"keyPassword": "xxx",
"path": [
"/home/hanfa.shf/ftpReaderTest/data"
],
@ -153,10 +155,26 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能
* **password**
* 描述ftp服务器访问密码。 <br />
* 描述ftp服务器访问密码。密码和私钥必须配置一项。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* 必选:是 <br />
* **privateKey**
* 描述ftp服务器访问私钥。密码和私钥必须配置一项。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **keyPassword**
* 描述:私钥密码密码。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **path**

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.ftpreader;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
public abstract class FtpHelper {
/**
@ -18,7 +19,7 @@ public abstract class FtpHelper {
* @return void
* @throws
*/
public abstract void loginFtpServer(String host, String username, String password, int port, int timeout,String connectMode) ;
public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode, Map<String, Object> extendParams) ;
/**
*
* @Title: LogoutFtpServer

View File

@ -2,9 +2,12 @@ package com.alibaba.datax.plugin.reader.ftpreader;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,6 +33,8 @@ public class FtpReader extends Reader {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;
private String connectPattern;
private int maxTraversalLevel;
@ -44,6 +49,9 @@ public class FtpReader extends Reader {
this.validateParameter();
UnstructuredStorageReaderUtil.validateParameter(this.originConfig);
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
if ("sftp".equals(protocol)) {
//sftp协议
this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT);
@ -53,7 +61,7 @@ public class FtpReader extends Reader {
this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_FTP_PORT);
this.ftpHelper = new StandardFtpHelper();
}
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern);
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams);
}
@ -67,7 +75,12 @@ public class FtpReader extends Reader {
}
this.host = this.originConfig.getNecessaryValue(Key.HOST, FtpReaderErrorCode.REQUIRED_VALUE);
this.username = this.originConfig.getNecessaryValue(Key.USERNAME, FtpReaderErrorCode.REQUIRED_VALUE);
this.password = this.originConfig.getNecessaryValue(Key.PASSWORD, FtpReaderErrorCode.REQUIRED_VALUE);
this.password = this.originConfig.getString(Key.PASSWORD);
this.privateKey = this.originConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.originConfig.getString(Key.KEYPASSWORD);
if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) {
throw DataXException.asDataXException(FtpReaderErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项");
}
this.timeout = originConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT);
this.maxTraversalLevel = originConfig.getInt(Key.MAXTRAVERSALLEVEL, Constant.DEFAULT_MAX_TRAVERSAL_LEVEL);
@ -175,6 +188,8 @@ public class FtpReader extends Reader {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private String protocol;
private int timeout;
private String connectPattern;
@ -192,10 +207,15 @@ public class FtpReader extends Reader {
this.protocol = readerSliceConfig.getString(Key.PROTOCOL);
this.username = readerSliceConfig.getString(Key.USERNAME);
this.password = readerSliceConfig.getString(Key.PASSWORD);
this.privateKey = readerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = readerSliceConfig.getString(Key.KEYPASSWORD);
this.timeout = readerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT);
this.sourceFiles = this.readerSliceConfig.getList(Constant.SOURCE_FILES, String.class);
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
if ("sftp".equals(protocol)) {
//sftp协议
this.port = readerSliceConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT);
@ -206,7 +226,7 @@ public class FtpReader extends Reader {
this.connectPattern = readerSliceConfig.getString(Key.CONNECTPATTERN, Constant.DEFAULT_FTP_CONNECT_PATTERN);// 默认为被动模式
this.ftpHelper = new StandardFtpHelper();
}
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern);
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams);
}

View File

@ -5,6 +5,8 @@ public class Key {
public static final String HOST = "host";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String PRIVATEKEY = "privateKey";
public static final String KEYPASSWORD = "keyPassword";
public static final String PORT = "port";
public static final String TIMEOUT = "timeout";
public static final String CONNECTPATTERN = "connectPattern";

View File

@ -2,10 +2,12 @@ package com.alibaba.datax.plugin.reader.ftpreader;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,11 +26,21 @@ public class SftpHelper extends FtpHelper {
Session session = null;
ChannelSftp channelSftp = null;
@Override
@Override
public void loginFtpServer(String host, String username, String password, int port, int timeout,
String connectMode) {
String connectMode, Map<String, Object> extendParams) {
JSch jsch = new JSch(); // 创建JSch对象
try {
String privateKey = (String)extendParams.get(Key.PRIVATEKEY);
String keyPassword = (String)extendParams.get(Key.KEYPASSWORD);
if (!StringUtils.isBlank(privateKey)) {
if (!StringUtils.isBlank(keyPassword)) {
jsch.addIdentity(privateKey, keyPassword);
} else {
jsch.addIdentity(privateKey);
}
}
session = jsch.getSession(username, host, port);
// 根据用户名主机ip端口获取一个Session对象
// 如果服务器连接不上则抛出异常
@ -37,7 +49,9 @@ public class SftpHelper extends FtpHelper {
"session is null,无法通过sftp与服务器建立链接请检查主机名和用户名是否正确.");
}
session.setPassword(password); // 设置密码
if (StringUtils.isBlank(privateKey)) {
session.setPassword(password); // 设置密码
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config); // 为Session对象设置properties

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTP;
@ -23,7 +24,7 @@ public class StandardFtpHelper extends FtpHelper {
@Override
public void loginFtpServer(String host, String username, String password, int port, int timeout,
String connectMode) {
String connectMode, Map<String, Object> extendParams) {
ftpClient = new FTPClient();
try {
// 连接

View File

@ -53,6 +53,8 @@ FtpWriter实现了从DataX协议转为FTP文件功能FTP文件本身是无结
"port": 22,
"username": "xxx",
"password": "xxx",
"privateKey": "/path_to_private_key",
"keyPassword": "xxx",
"timeout": "60000",
"connectPattern": "PASV",
"path": "/tmp/data/",
@ -117,9 +119,25 @@ FtpWriter实现了从DataX协议转为FTP文件功能FTP文件本身是无结
* **password**
* 描述ftp服务器访问密码。 <br />
* 描述ftp服务器访问密码。密码和私钥必须配置一项。 <br />
* 必选:是 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **privateKey**
* 描述ftp服务器访问私钥。密码和私钥必须配置一项。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **keyPassword**
* 描述:私钥密码密码。 <br />
* 必选:否 <br />
* 默认值:无 <br />

View File

@ -18,7 +18,9 @@ import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@ -34,6 +36,8 @@ public class FtpWriter extends Writer {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;
private IFtpHelper ftpHelper = null;
@ -48,8 +52,11 @@ public class FtpWriter extends Writer {
RetryUtil.executeWithRetry(new Callable<Void>() {
@Override
public Void call() throws Exception {
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
ftpHelper.loginFtpServer(host, username, password,
port, timeout);
port, timeout, extendParams);
return null;
}
}, 3, 4000, true);
@ -81,8 +88,13 @@ public class FtpWriter extends Writer {
FtpWriterErrorCode.REQUIRED_VALUE);
this.username = this.writerSliceConfig.getNecessaryValue(
Key.USERNAME, FtpWriterErrorCode.REQUIRED_VALUE);
this.password = this.writerSliceConfig.getNecessaryValue(
Key.PASSWORD, FtpWriterErrorCode.REQUIRED_VALUE);
this.password = this.writerSliceConfig.getString(Key.PASSWORD);
this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD);
if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) {
throw DataXException.asDataXException(
FtpWriterErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项");
}
this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT,
Constant.DEFAULT_TIMEOUT);
@ -208,6 +220,8 @@ public class FtpWriter extends Writer {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;
private IFtpHelper ftpHelper = null;
@ -225,6 +239,8 @@ public class FtpWriter extends Writer {
this.port = this.writerSliceConfig.getInt(Key.PORT);
this.username = this.writerSliceConfig.getString(Key.USERNAME);
this.password = this.writerSliceConfig.getString(Key.PASSWORD);
this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD);
this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT,
Constant.DEFAULT_TIMEOUT);
this.protocol = this.writerSliceConfig.getString(Key.PROTOCOL);
@ -238,8 +254,11 @@ public class FtpWriter extends Writer {
RetryUtil.executeWithRetry(new Callable<Void>() {
@Override
public Void call() throws Exception {
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
ftpHelper.loginFtpServer(host, username, password,
port, timeout);
port, timeout, extendParams);
return null;
}
}, 3, 4000, true);

View File

@ -8,6 +8,10 @@ public class Key {
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String PRIVATEKEY = "privateKey";
public static final String KEYPASSWORD = "keyPassword";
public static final String PORT = "port";

View File

@ -1,12 +1,13 @@
package com.alibaba.datax.plugin.writer.ftpwriter.util;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
public interface IFtpHelper {
//使用被动方式
public void loginFtpServer(String host, String username, String password, int port, int timeout);
public void loginFtpServer(String host, String username, String password, int port, int timeout, Map<String, Object> extendParams);
public void logoutFtpServer();

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.writer.ftpwriter.util;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
@ -13,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.writer.ftpwriter.Key;
import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
@ -33,9 +35,18 @@ public class SftpHelperImpl implements IFtpHelper {
@Override
public void loginFtpServer(String host, String username, String password,
int port, int timeout) {
int port, int timeout, Map<String, Object> extendParams) {
JSch jsch = new JSch();
try {
String privateKey = (String)extendParams.get(Key.PRIVATEKEY);
String keyPassword = (String)extendParams.get(Key.KEYPASSWORD);
if (!StringUtils.isBlank(privateKey)) {
if (!StringUtils.isBlank(keyPassword)) {
jsch.addIdentity(privateKey, keyPassword);
} else {
jsch.addIdentity(privateKey);
}
}
this.session = jsch.getSession(username, host, port);
if (this.session == null) {
throw DataXException
@ -43,7 +54,9 @@ public class SftpHelperImpl implements IFtpHelper {
"创建ftp连接this.session失败,无法通过sftp与服务器建立链接请检查主机名和用户名是否正确.");
}
this.session.setPassword(password);
if (StringUtils.isBlank(privateKey)) {
this.session.setPassword(password);
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
// config.put("PreferredAuthentications", "password");

View File

@ -5,6 +5,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
@ -28,7 +29,7 @@ public class StandardFtpHelperImpl implements IFtpHelper {
@Override
public void loginFtpServer(String host, String username, String password,
int port, int timeout) {
int port, int timeout, Map<String, Object> extendParams) {
this.ftpClient = new FTPClient();
try {
this.ftpClient.setControlEncoding("UTF-8");