throw exception

This commit is contained in:
hantmac 2023-08-03 10:04:08 +08:00
parent 75f2ebc095
commit 8bfe3a590c
2 changed files with 19 additions and 23 deletions

View File

@ -17,30 +17,30 @@ import java.sql.*;
import java.util.List;
import java.util.regex.Pattern;
public class DatabendWriter extends Writer
{
public class DatabendWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
public static class Job
extends Writer.Job
{
extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override
public void init()
{
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
// placeholder currently not supported by databend driver, needs special treatment
DatabendWriterUtil.dealWriteMode(this.originalConfig);
try {
DatabendWriterUtil.dealWriteMode(this.originalConfig);
} catch (Exception e) {
LOG.error(e.toString());
}
}
@Override
public void preCheck()
{
public void preCheck() {
this.init();
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@ -67,8 +67,7 @@ public class DatabendWriter extends Writer
}
public static class Task extends Writer.Task
{
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
@ -76,11 +75,10 @@ public class DatabendWriter extends Writer
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override
public void init()
{
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) {
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
try {
@ -177,8 +175,8 @@ public class DatabendWriter extends Writer
case Types.BOOLEAN:
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
@ -224,8 +222,7 @@ public class DatabendWriter extends Writer
}
@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}
@ -238,9 +235,9 @@ public class DatabendWriter extends Writer
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver)
{
public void startWrite(RecordReceiver lineReceiver) {
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
}

View File

@ -17,7 +17,7 @@ public final class DatabendWriterUtil {
private DatabendWriterUtil() {
}
public static void dealWriteMode(Configuration originalConfig) {
public static void dealWriteMode(Configuration originalConfig) throws Exception {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class);
StringBuilder writeDataSqlTemplate = new StringBuilder();
@ -29,8 +29,7 @@ public final class DatabendWriterUtil {
LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) {
if (onConflictColumns == null || onConflictColumns.size() == 0) {
LOG.error("Replace mode must has onConflictColumn conf");
return;
throw new Exception("Replace mode must has onConflictColumn config");
}
// for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace"
writeDataSqlTemplate.append("REPLACE INTO %s (")