support csv to arrow

This commit is contained in:
shaojin.wensj 2023-04-14 19:04:27 +08:00
parent 218481e3c2
commit dad6894ad9
12 changed files with 509 additions and 123 deletions

View File

@ -3,6 +3,9 @@ package com.alibaba.fastjson2.reader;
import java.nio.charset.Charset;
public interface ByteArrayValueConsumer {
default void start() {
}
default void beforeRow(int row) {
}
@ -10,4 +13,7 @@ public interface ByteArrayValueConsumer {
default void afterRow(int row) {
}
default void end() {
}
}

View File

@ -1,6 +1,9 @@
package com.alibaba.fastjson2.reader;
public interface CharArrayValueConsumer<T> {
default void start() {
}
default void beforeRow(int row) {
}
@ -8,4 +11,7 @@ public interface CharArrayValueConsumer<T> {
default void afterRow(int row) {
}
default void end() {
}
}

View File

@ -23,7 +23,7 @@ import java.util.function.Function;
import static com.alibaba.fastjson2.util.DateUtils.DEFAULT_ZONE_ID;
class CSVReaderUTF16<T>
final class CSVReaderUTF16<T>
extends CSVReader<T> {
static final Map<Long, Function<Consumer, CharArrayValueConsumer>> valueConsumerCreators
= new ConcurrentHashMap<>();
@ -567,6 +567,8 @@ class CSVReaderUTF16<T>
}
public void readAll(CharArrayValueConsumer<T> consumer) {
consumer.start();
while (true) {
try {
if (inputEnd) {
@ -685,6 +687,7 @@ class CSVReaderUTF16<T>
}
consumer.afterRow(rowCount);
}
consumer.end();
}
class CharArrayConsumerImpl<T>

View File

@ -618,6 +618,8 @@ final class CSVReaderUTF8<T>
}
protected void readAll(ByteArrayValueConsumer consumer) {
consumer.start();
while (true) {
try {
if (inputEnd) {
@ -736,5 +738,6 @@ final class CSVReaderUTF8<T>
}
consumer.afterRow(rowCount);
}
consumer.end();
}
}

View File

@ -140,7 +140,7 @@ public class TypeUtils {
if (ch >= 0 && ch < X2.chars.length) {
return X2.chars[ch];
}
return new String(new byte[] {ch}, StandardCharsets.ISO_8859_1);
return new String(new byte[]{ch}, StandardCharsets.ISO_8859_1);
}
public static String toString(char c0, char c1) {
@ -148,7 +148,7 @@ public class TypeUtils {
int value = (c0 - X2.START) * X2.SIZE2 + (c1 - X2.START);
return X2.chars2[value];
}
return new String(new char[] {c0, c1});
return new String(new char[]{c0, c1});
}
public static String toString(byte c0, byte c1) {
@ -156,7 +156,7 @@ public class TypeUtils {
int value = (c0 - X2.START) * X2.SIZE2 + (c1 - X2.START);
return X2.chars2[value];
}
return new String(new byte[] {c0, c1}, StandardCharsets.ISO_8859_1);
return new String(new byte[]{c0, c1}, StandardCharsets.ISO_8859_1);
}
public static Type intern(Type type) {
@ -1768,100 +1768,21 @@ public class TypeUtils {
}
char[] chars = JDKUtils.getCharArray(str);
return toBigDecimal(chars);
return parseBigDecimal(chars, 0, chars.length);
}
public static BigDecimal toBigDecimal(char[] chars) {
if (chars == null || chars.length == 0) {
if (chars == null) {
return null;
}
boolean negative = false;
int j = 0;
if (chars[0] == '-') {
negative = true;
j = 1;
}
if (chars.length <= 20 || (negative && chars.length == 21)) {
int dot = 0;
int dotIndex = -1;
long unscaleValue = 0;
for (; j < chars.length; j++) {
char b = chars[j];
if (b == '.') {
dot++;
if (dot > 1) {
break;
}
dotIndex = j;
} else if (b >= '0' && b <= '9') {
unscaleValue = unscaleValue * 10 + (b - '0');
} else {
unscaleValue = -1;
break;
}
}
int scale = 0;
if (unscaleValue >= 0 && dot <= 1) {
if (negative) {
unscaleValue = -unscaleValue;
}
if (dotIndex != -1) {
scale = chars.length - dotIndex - 1;
}
return BigDecimal.valueOf(unscaleValue, scale);
}
}
return new BigDecimal(chars, 0, chars.length);
return parseBigDecimal(chars, 0, chars.length);
}
public static BigDecimal toBigDecimal(byte[] strBytes) {
if (strBytes == null || strBytes.length == 0) {
if (strBytes == null) {
return null;
}
boolean negative = false;
int j = 0;
if (strBytes[0] == '-') {
negative = true;
j = 1;
}
if (strBytes.length <= 20 || (negative && strBytes.length == 21)) {
int dot = 0;
int dotIndex = -1;
long unscaleValue = 0;
for (; j < strBytes.length; j++) {
byte b = strBytes[j];
if (b == '.') {
dot++;
if (dot > 1) {
break;
}
dotIndex = j;
} else if (b >= '0' && b <= '9') {
unscaleValue = unscaleValue * 10 + (b - '0');
} else {
unscaleValue = -1;
break;
}
}
int scale = 0;
if (unscaleValue >= 0 && dot <= 1) {
if (negative) {
unscaleValue = -unscaleValue;
}
if (dotIndex != -1) {
scale = strBytes.length - dotIndex - 1;
}
return BigDecimal.valueOf(unscaleValue, scale);
}
}
char[] chars = X1.TO_CHARS.apply(strBytes);
return new BigDecimal(chars, 0, chars.length);
return parseBigDecimal(strBytes, 0, strBytes.length);
}
public static BigInteger toBigInteger(Object value) {
@ -2619,7 +2540,7 @@ public class TypeUtils {
}
public static BigDecimal parseBigDecimal(char[] bytes, int off, int len) {
if (len == 0) {
if (bytes == null || len == 0) {
return null;
}
@ -2666,7 +2587,7 @@ public class TypeUtils {
}
public static BigDecimal parseBigDecimal(byte[] bytes, int off, int len) {
if (len == 0) {
if (bytes == null || len == 0) {
return null;
}
@ -2709,10 +2630,16 @@ public class TypeUtils {
}
}
char[] chars = new char[len];
for (int i = 0; i < len; i++) {
chars[i] = (char) bytes[off + i];
char[] chars;
if (off == 0 && len == bytes.length) {
chars = X1.TO_CHARS.apply(bytes);
} else {
chars = new char[len];
for (int i = 0; i < len; i++) {
chars[i] = (char) bytes[off + i];
}
}
return new BigDecimal(chars, 0, chars.length);
}

View File

@ -18,10 +18,10 @@ import java.util.function.ObjIntConsumer;
public class ArrowByteArrayConsumer
implements ByteArrayValueConsumer {
static final int CHUNK_SIZE = 10000;
static final int CHUNK_SIZE = 1_000_000;
final Schema schema;
final int rowCount;
final int varcharValueSize = 2048;
final int varcharValueSize = 128;
final ObjIntConsumer<VectorSchemaRoot> rootConsumer;
final Consumer<Long[]> committer;
BufferAllocator allocator;
@ -30,6 +30,7 @@ public class ArrowByteArrayConsumer
int blockSize;
int blockRowIndex;
int blockIndex = -1;
int[] valueCapacities;
public ArrowByteArrayConsumer(
Schema schema,
@ -77,19 +78,21 @@ public class ArrowByteArrayConsumer
public void allocateNew(int blockSize) {
root = VectorSchemaRoot.create(schema, allocator);
this.blockSize = blockSize;
this.blockRowIndex = 0;
root.setRowCount(blockSize);
List<Field> fields = root.getSchema().getFields();
for (int i = 0; i < fields.size(); i++) {
final int fieldsSize = fields.size();
valueCapacities = new int[fieldsSize];
for (int i = 0; i < fieldsSize; i++) {
FieldVector vector = root.getVector(i);
if (vector instanceof FixedWidthVector) {
((FixedWidthVector) vector).allocateNew(blockSize);
} else if (vector instanceof VariableWidthVector) {
VariableWidthVector variableWidthVector = (VariableWidthVector) vector;
variableWidthVector.allocateNew(varcharValueSize * blockSize, blockSize);
valueCapacities[i] = variableWidthVector.getValueCapacity();
} else {
throw new JSONException("TODO");
}
@ -125,6 +128,13 @@ public class ArrowByteArrayConsumer
if (vector instanceof VarCharVector) {
VarCharVector charVector = (VarCharVector) vector;
int valueCapacity = valueCapacities[column];
int startOffset = charVector.getStartOffset(row);
if (startOffset + len >= valueCapacity) {
int newValueCapacity = valueCapacity + Math.max(len, varcharValueSize * rowCount);
charVector.reallocDataBuffer(newValueCapacity);
valueCapacities[column] = newValueCapacity;
}
charVector.set(row, bytes, off, len);
return;
}

View File

@ -1,6 +1,8 @@
package com.alibaba.fastjson2.support.csv;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.util.DateUtils;
import com.alibaba.fastjson2.util.JDKUtils;
import com.alibaba.fastjson2.util.TypeUtils;
import com.alibaba.fastjson2.util.UnsafeUtils;
import org.apache.arrow.memory.ArrowBuf;
@ -9,9 +11,10 @@ import org.apache.arrow.vector.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static com.alibaba.fastjson2.util.JDKUtils.FIELD_DECIMAL_INT_COMPACT_OFFSET;
import static com.alibaba.fastjson2.util.JDKUtils.*;
public class ArrowUtils {
static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
@ -100,6 +103,213 @@ public class ArrowUtils {
}
}
public static void setValue(FieldVector vector, int row, String value) {
if (value == null || value.isEmpty()) {
return;
}
if (vector instanceof IntVector) {
((IntVector) vector).set(row, Integer.parseInt(value));
return;
}
if (vector instanceof BigIntVector) {
((BigIntVector) vector).set(row, Long.parseLong(value));
return;
}
if (vector instanceof DecimalVector) {
DecimalVector decimalVector = (DecimalVector) vector;
ArrowUtils.setDecimal(decimalVector, row, value);
return;
}
if (vector instanceof DateMilliVector) {
long millis = DateUtils.parseMillis(value);
((DateMilliVector) vector).set(row, millis);
return;
}
if (vector instanceof VarCharVector) {
VarCharVector varCharVector = (VarCharVector) vector;
ArrowUtils.setString(varCharVector, row, value);
return;
}
if (vector instanceof Float8Vector) {
double doubleValue = Double.parseDouble(value);
((Float8Vector) vector).set(row, doubleValue);
return;
}
if (vector instanceof Float4Vector) {
float floatValue = Float.parseFloat(value);
((Float4Vector) vector).set(row, floatValue);
return;
}
if (vector instanceof TinyIntVector) {
int intValue = Integer.parseInt(value);
((TinyIntVector) vector).set(row, (byte) intValue);
return;
}
if (vector instanceof SmallIntVector) {
int intValue = Integer.parseInt(value);
((SmallIntVector) vector).set(row, (short) intValue);
return;
}
if (vector instanceof TimeStampMilliVector) {
long millis = DateUtils.parseMillis(value);
((TimeStampMilliVector) vector).set(row, millis);
return;
}
if (vector instanceof BitVector) {
Boolean booleanValue = Boolean.parseBoolean(value);
if (value != null) {
int intValue = booleanValue.booleanValue() ? 1 : 0;
((BitVector) vector).set(row, intValue);
}
return;
}
if (vector instanceof Decimal256Vector) {
BigDecimal decimal = TypeUtils.toBigDecimal(value);
Decimal256Vector decimalVector = (Decimal256Vector) vector;
int scale = decimalVector.getScale();
if (decimal.scale() != scale) {
decimal = decimal.setScale(scale);
}
decimalVector.set(row, decimal);
return;
}
throw new JSONException("TODO " + vector.getClass());
}
public static void setDecimal(DecimalVector vector, int row, String str) {
if (str == null || str.length() == 0) {
vector.setNull(row);
return;
}
if (STRING_CODER != null && STRING_VALUE != null && STRING_CODER.applyAsInt(str) == 0) {
byte[] bytes = JDKUtils.STRING_VALUE.apply(str);
setDecimal(vector, row, bytes, 0, bytes.length);
}
char[] chars = JDKUtils.getCharArray(str);
setDecimal(vector, row, chars, 0, chars.length);
}
public static void setString(VarCharVector vector, int row, String str) {
if (str == null || str.length() == 0) {
vector.setNull(row);
return;
}
byte[] bytes;
if (STRING_CODER != null && STRING_VALUE != null && STRING_CODER.applyAsInt(str) == 0) {
bytes = JDKUtils.STRING_VALUE.apply(str);
} else {
bytes = str.getBytes(StandardCharsets.UTF_8);
}
vector.set(row, bytes);
}
public static void setDecimal(DecimalVector vector, int row, char[] bytes, int off, int len) {
boolean negative = false;
int j = off;
if (bytes[off] == '-') {
negative = true;
j++;
}
if (len <= 20 || (negative && len == 21)) {
int end = off + len;
int dot = 0;
int dotIndex = -1;
long unscaleValue = 0;
for (; j < end; j++) {
char b = bytes[j];
if (b == '.') {
dot++;
if (dot > 1) {
break;
}
dotIndex = j;
} else if (b >= '0' && b <= '9') {
unscaleValue = unscaleValue * 10 + (b - '0');
} else {
unscaleValue = -1;
break;
}
}
int scale = 0;
if (unscaleValue >= 0 && dot <= 1) {
if (dotIndex != -1) {
scale = len - (dotIndex - off) - 1;
}
boolean overflow = false;
long unscaleValueV = unscaleValue;
int scaleV = vector.getScale();
if (scaleV > scale) {
for (int i = scale; i < scaleV; i++) {
unscaleValueV *= 10;
if (unscaleValueV < 0) {
overflow = true;
break;
}
}
} else if (scaleV < scale) {
overflow = true;
}
if (!overflow) {
if (negative) {
unscaleValueV = -unscaleValueV;
}
BitVectorHelper.setBit(vector.getValidityBuffer(), row);
ArrowBuf dataBuffer = vector.getDataBuffer();
final long startIndex = (long) row * DECIMAL_TYPE_WIDTH;
if (LITTLE_ENDIAN) {
// Decimal stored as native-endian, need to swap data bytes before writing to ArrowBuf if LE
// Write LE data
dataBuffer.setLong(startIndex, unscaleValueV);
} else {
// Write BE data
dataBuffer.setLong(startIndex, 0);
long littleEndianValue = Long.reverseBytes(unscaleValueV);
dataBuffer.setLong(startIndex + 8, littleEndianValue);
}
return;
}
if (negative) {
unscaleValue = -unscaleValue;
}
BigDecimal decimal = BigDecimal.valueOf(unscaleValue, scale);
if (vector.getScale() != decimal.scale()) {
decimal = decimal.setScale(vector.getScale(), BigDecimal.ROUND_CEILING);
}
vector.set(row, decimal);
return;
}
}
BigDecimal decimal = TypeUtils.parseBigDecimal(bytes, off, len);
if (vector.getScale() != decimal.scale()) {
decimal = decimal.setScale(vector.getScale(), BigDecimal.ROUND_CEILING);
}
vector.set(row, decimal);
}
public static void setDecimal(DecimalVector vector, int row, byte[] bytes, int off, int len) {
boolean negative = false;
int j = off;

View File

@ -2,8 +2,6 @@ package com.alibaba.fastjson2.support.csv;
import com.alibaba.fastjson2.JSONException;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import org.apache.arrow.vector.*;
@ -14,18 +12,13 @@ import java.io.File;
import java.util.List;
public class OdpsArrowReadDemo {
private static String accessID = "";
private static String accessKey = "";
private static String project = "sonar_test";
public static void main(String[] args) throws Exception {
Account account = new AliyunAccount(accessID, accessKey);
Odps odps = new Odps(account);
odps.setDefaultProject(project);
Odps odps = OdpsTestUtils.odps();
String tableName = "x7";
TableTunnel tunnel = new TableTunnel(odps);
TableTunnel.DownloadSession downloadSession = tunnel.createDownloadSession(project, tableName);
TableTunnel.DownloadSession downloadSession = tunnel.createDownloadSession(odps.getDefaultProject(), tableName);
long recordCount = downloadSession.getRecordCount();
System.out.println("recordCount : " + recordCount);

View File

@ -3,8 +3,6 @@ package com.alibaba.fastjson2.support.csv;
import com.alibaba.fastjson2.JSONException;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrowRecordWriter;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.tunnel.TableTunnel;
@ -19,20 +17,12 @@ import java.util.Arrays;
public class OdpsArrowWriteDemo {
// EPA_SmartLocationDatabase_V3_Jan_2021_Final.csv X4
static final File file = new File("/Users/wenshao/Downloads/Demographics_by_Zip_Code.csv");
private static String accessID = "";
private static String accessKey = "";
private static String project = "sonar_test";
static final File file = new File("/Users/wenshao/Downloads/Public_School_Characteristics_2020-21.csv");
public static void main(String[] args) throws Exception {
Account account = new AliyunAccount(accessID, accessKey);
Odps odps = new Odps(account);
odps.setDefaultProject(project);
Odps odps = OdpsTestUtils.odps();
String tableName = "x7";
TableTunnel tunnel = new TableTunnel(odps);
{
String dropTable = "drop table if exists " + tableName + ";";
System.out.println(dropTable);
@ -46,13 +36,14 @@ public class OdpsArrowWriteDemo {
createTableTask.waitForSuccess();
}
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(project, tableName);
long start = System.currentTimeMillis();
int rowCount = CSVReader.rowCount(file) - 1;
System.out.println("rowCount : " + rowCount);
TableTunnel tunnel = new TableTunnel(odps);
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(odps.getDefaultProject(), tableName);
final Schema schema = uploadSession.getArrowSchema();
long start = System.currentTimeMillis();
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME, 0, 0);
ArrowByteArrayConsumer consumer = new ArrowByteArrayConsumer(

View File

@ -0,0 +1,118 @@
package com.alibaba.fastjson2.support.csv;
import com.alibaba.fastjson2.JSONException;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.data.ArrowRecordWriter;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.io.CompressOption;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.util.List;
public class OdpsArrowWriteDemo1 {
static final File file = new File("/Users/wenshao/Downloads/Public_School_Characteristics_2020-21.csv");
public static void main(String[] args) throws Exception {
Odps odps = OdpsTestUtils.odps();
String tableName = "x7";
{
String dropTable = "drop table if exists " + tableName + ";";
System.out.println(dropTable);
Instance dropTableTask = SQLTask.run(odps, dropTable);
dropTableTask.waitForSuccess();
String ddl = CSVUtils.genMaxComputeCreateTable(file, tableName);
System.out.println(ddl);
Instance createTableTask = SQLTask.run(odps, ddl);
createTableTask.waitForSuccess();
}
long start = System.currentTimeMillis();
TableTunnel tunnel = new TableTunnel(odps);
writeData(odps.getDefaultProject(), tableName, tunnel);
long millis = System.currentTimeMillis() - start;
System.out.println("millis " + millis);
}
private static void writeData(
String project,
String tableName,
TableTunnel tunnel
) throws Exception {
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(project, tableName);
CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME, 0, 0);
ArrowRecordWriter arrowRecordWriter = uploadSession.openArrowRecordWriter(0, compressOption);
Schema arrowSchema = uploadSession.getArrowSchema();
List<Field> fields = arrowSchema.getFields();
BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot root = createRoot(arrowSchema, fields, allocator);
CSVReader csvReader = CSVReader.of(file);
csvReader.readHeader();
int row = 0;
for (; ; row++) {
String[] line = csvReader.readLine();
if (line == null) {
break;
}
for (int i = 0; i < line.length; i++) {
String value = line[i];
if (value == null || value.isEmpty()) {
continue;
}
if (i >= fields.size()) {
break;
}
FieldVector vector = root.getVector(i);
ArrowUtils.setValue(vector, row, value);
}
}
root.setRowCount(row);
arrowRecordWriter.write(root);
arrowRecordWriter.close();
uploadSession.commit(new Long[]{0L});
}
@NotNull
private static VectorSchemaRoot createRoot(
Schema arrowSchema,
List<Field> fields,
BufferAllocator allocator
) throws Exception {
VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);
int rowCount = CSVReader.rowCount(file);
System.out.println("rowCount : " + rowCount);
int blockSize = rowCount;
final int varcharValueSize = 128;
for (int i = 0; i < fields.size(); i++) {
FieldVector vector = root.getVector(i);
if (vector instanceof FixedWidthVector) {
((FixedWidthVector) vector).allocateNew(blockSize);
} else if (vector instanceof VariableWidthVector) {
VariableWidthVector variableWidthVector = (VariableWidthVector) vector;
variableWidthVector.allocateNew(varcharValueSize * blockSize, blockSize);
} else {
throw new JSONException("TODO");
}
}
return root;
}
}

View File

@ -0,0 +1,22 @@
package com.alibaba.fastjson2.support.csv;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
public class OdpsTestUtils {
private static String accessID = "";
private static String accessKey = "";
private static String project = "sonar_test";
private static String endpoint;
public static Odps odps() {
Account account = new AliyunAccount(accessID, accessKey);
Odps odps = new Odps(account);
odps.setDefaultProject(project);
if (endpoint != null && !endpoint.isEmpty()) {
odps.setEndpoint(endpoint);
}
return odps;
}
}

View File

@ -0,0 +1,97 @@
package com.alibaba.fastjson2.support.csv;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.util.DateUtils;
import com.alibaba.fastjson2.util.TypeUtils;
import com.aliyun.odps.Column;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.tunnel.TableTunnel;
import java.io.File;
import java.util.List;
public class OdpsWriteDemo {
static final File file = new File("/Users/wenshao/Downloads/Public_School_Characteristics_2020-21.csv");
public static void main(String[] args) throws Exception {
Odps odps = OdpsTestUtils.odps();
String tableName = "x7";
TableTunnel tunnel = new TableTunnel(odps);
{
String dropTable = "drop table if exists " + tableName + ";";
System.out.println(dropTable);
Instance dropTableTask = SQLTask.run(odps, dropTable);
dropTableTask.waitForSuccess();
String ddl = CSVUtils.genMaxComputeCreateTable(file, tableName);
System.out.println(ddl);
Instance createTableTask = SQLTask.run(odps, ddl);
createTableTask.waitForSuccess();
}
long start = System.currentTimeMillis();
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(odps.getDefaultProject(), tableName);
List<Column> columnList = uploadSession.getSchema().getColumns();
Column[] columns = columnList.toArray(new Column[columnList.size()]);
RecordWriter recordWriter = uploadSession.openRecordWriter(0);
CSVReader csvReader = CSVReader.of(file);
csvReader.readHeader();
while (true) {
String[] line = csvReader.readLine();
if (line == null) {
break;
}
ArrayRecord record = new ArrayRecord(columns);
for (int i = 0; i < line.length; i++) {
String value = line[i];
if (value == null || value.isEmpty()) {
continue;
}
if (i >= columns.length) {
break;
}
Column column = columns[i];
OdpsType odpsType = column.getTypeInfo().getOdpsType();
switch (odpsType) {
case INT:
record.setInt(i, Integer.parseInt(value));
break;
case BIGINT:
record.setBigint(i, Long.parseLong(value));
break;
case STRING:
record.set(i, value);
break;
case DATETIME:
record.set(i, DateUtils.parseDate(value));
break;
case DECIMAL:
record.set(i, TypeUtils.toBigDecimal(value));
break;
case DOUBLE:
record.set(i, Double.parseDouble(value));
break;
default:
throw new JSONException("TODO " + odpsType);
}
}
recordWriter.write(record);
}
recordWriter.close();
uploadSession.commit(new Long[] {0L});
long millis = System.currentTimeMillis() - start;
System.out.println("millis " + millis);
}
}