This commit is contained in:
hantmac 2023-08-03 10:59:26 +08:00
parent 8bfe3a590c
commit ae6e6630ef
3 changed files with 47 additions and 8 deletions

View File

@ -27,16 +27,12 @@ public class DatabendWriter extends Writer {
private CommonRdbmsWriter.Job commonRdbmsWriterMaster; private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override @Override
public void init() { public void init() throws DataXException {
this.originalConfig = super.getPluginJobConf(); this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig); this.commonRdbmsWriterMaster.init(this.originalConfig);
// placeholder currently not supported by databend driver, needs special treatment // placeholder currently not supported by databend driver, needs special treatment
try {
DatabendWriterUtil.dealWriteMode(this.originalConfig); DatabendWriterUtil.dealWriteMode(this.originalConfig);
} catch (Exception e) {
LOG.error(e.toString());
}
} }
@Override @Override

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.writer.databendwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum DatabendWriterErrorCode implements ErrorCode {
CONF_ERROR("DatabendWriter-00", "配置错误."),
WRITE_DATA_ERROR("DatabendWriter-01", "写入数据时失败."),
;
private final String code;
private final String description;
private DatabendWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -1,13 +1,16 @@
package com.alibaba.datax.plugin.writer.databendwriter.util; package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.databendwriter.DatabendWriterErrorCode;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.xml.crypto.Data;
import java.util.List; import java.util.List;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -17,7 +20,7 @@ public final class DatabendWriterUtil {
private DatabendWriterUtil() { private DatabendWriterUtil() {
} }
public static void dealWriteMode(Configuration originalConfig) throws Exception { public static void dealWriteMode(Configuration originalConfig) throws DataXException {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class); List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class);
StringBuilder writeDataSqlTemplate = new StringBuilder(); StringBuilder writeDataSqlTemplate = new StringBuilder();
@ -29,8 +32,14 @@ public final class DatabendWriterUtil {
LOG.info("write mode is {}", writeMode); LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) { if (writeMode.toLowerCase().contains("replace")) {
if (onConflictColumns == null || onConflictColumns.size() == 0) { if (onConflictColumns == null || onConflictColumns.size() == 0) {
throw new Exception("Replace mode must has onConflictColumn config"); throw DataXException
.asDataXException(
DatabendWriterErrorCode.CONF_ERROR,
String.format(
"Replace mode must has onConflictColumn config."
));
} }
// for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace" // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace"
writeDataSqlTemplate.append("REPLACE INTO %s (") writeDataSqlTemplate.append("REPLACE INTO %s (")
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns))
@ -52,6 +61,7 @@ public final class DatabendWriterUtil {
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
} }
} }
public static String onConFlictDoString(List<String> conflictColumns) { public static String onConFlictDoString(List<String> conflictColumns) {