对example工程结构进行了重新规划,给出两个编写测试模块的案例。

对example工程结构进行了重新规划,给出两个编写测试模块的案例。
This commit is contained in:
FuYouJ 2023-08-20 19:10:46 +08:00
parent c0511ba227
commit 3207b63c52
24 changed files with 339 additions and 307 deletions

View File

@ -100,10 +100,7 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
- 整库迁移https://help.aliyun.com/document_detail/137809.html
- 批量上云https://help.aliyun.com/document_detail/146671.html
- 更新更多能力请访问https://help.aliyun.com/document_detail/137663.html
-
# 本地快速调试读写插件定位BUG
在example模块可以方便快速的在本地运行任务
点击:[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md)
-
# 我要开发新的插件

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>datax-example-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -63,9 +63,8 @@ public class ExampleConfigParser {
" <include>**/*.*</include>\n" +
" </includes>\n" +
" <filtering>true</filtering>\n" +
" </resource>\n [可参阅streamreader pom文件] \n" +
"3If you are using 'datax-example' as the startup module, " +
"check whether the 'example' module has imported the dependencies of the plugin. Refer to the 'pom' file of the 'example' module";
" </resource>\n [Refer to the streamreader pom file] \n" +
"3: Check that the datax-yourPlugin-example module imported your test plugin";
message = String.format(message, failedPlugin);
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, message);
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.example.util;
import org.junit.Assert;
import org.junit.Test;
/**
* {@code Author} FuYouJ
* {@code Date} 2023/8/19 21:38
*/
public class PathUtilTest {
@Test
public void testParseClassPathFile() {
String path = "/pathTest.json";
String absolutePathFromClassPath = PathUtil.getAbsolutePathFromClassPath(path);
Assert.assertNotNull(absolutePathFromClassPath);
}
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>datax-example-neo4j</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<test.container.version>1.17.6</test.container.version>
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test.container.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>neo4jwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example-streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,138 @@
package com.alibaba.datax.example.neo4j;
import com.alibaba.datax.example.ExampleContainer;
import com.alibaba.datax.example.util.PathUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.driver.*;
import org.neo4j.driver.types.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* {@code Author} FuYouJ
* {@code Date} 2023/8/19 21:48
*/
public class StreamReader2Neo4jWriterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader2Neo4jWriterTest.class);
private static final String CONTAINER_IMAGE = "neo4j:5.9.0";
private static final String CONTAINER_HOST = "neo4j-host";
private static final int HTTP_PORT = 7474;
private static final int BOLT_PORT = 7687;
private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
private static final String CONTAINER_NEO4J_PASSWORD = "Test@12343";
private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + BOLT_PORT);
protected static final Network NETWORK = Network.newNetwork();
private GenericContainer<?> container;
protected Driver neo4jDriver;
protected Session neo4jSession;
private static final int CHANNEL = 5;
private static final int READER_NUM = 10;
@Before
public void init() {
DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
container =
new GenericContainer<>(imageName)
.withNetwork(NETWORK)
.withNetworkAliases(CONTAINER_HOST)
.withExposedPorts(HTTP_PORT, BOLT_PORT)
.withEnv(
"NEO4J_AUTH",
CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
.withEnv("apoc.export.file.enabled", "true")
.withEnv("apoc.import.file.enabled", "true")
.withEnv("apoc.import.file.use_neo4j_config", "true")
.withEnv("NEO4J_PLUGINS", "[\"apoc\"]")
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
container.setPortBindings(
Arrays.asList(
String.format("%s:%s", HTTP_PORT, HTTP_PORT),
String.format("%s:%s", BOLT_PORT, BOLT_PORT)));
Startables.deepStart(Stream.of(container)).join();
LOGGER.info("container started");
Awaitility.given()
.ignoreExceptions()
.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
}
//在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题
@Test
public void streamReader2Neo4j() {
deleteHistoryIfExist();
String path = "/streamreader2neo4j.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
//根据channel和reader的mock数据校验结果集是否符合预期
verifyWriteResult();
}
private void deleteHistoryIfExist() {
String query = "match (n:StreamReader) return n limit 1";
String delete = "match (n:StreamReader) delete n";
if (neo4jSession.run(query).hasNext()) {
neo4jSession.run(delete);
}
}
private void verifyWriteResult() {
int total = CHANNEL * READER_NUM;
String query = "match (n:StreamReader) return n";
Result run = neo4jSession.run(query);
int count = 0;
while (run.hasNext()) {
Record record = run.next();
Node node = record.get("n").asNode();
if (node.hasLabel("StreamReader")) {
count++;
}
}
Assert.assertEquals(count, total);
}
@After
public void destroy() {
if (neo4jSession != null) {
neo4jSession.close();
}
if (neo4jDriver != null) {
neo4jDriver.close();
}
if (container != null) {
container.close();
}
}
private void initConnection() {
neo4jDriver =
GraphDatabase.driver(
CONTAINER_URI,
AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
}
}

View File

@ -0,0 +1,51 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "string",
"value": "StreamReader"
},
{
"type": "string",
"value": "1997"
}
]
}
},
"writer": {
"name": "neo4jWriter",
"parameter": {
"uri": "bolt://localhost:7687",
"username":"neo4j",
"password":"Test@12343",
"database":"neo4j",
"cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ",
"batchDataVariableName": "batch",
"batchSize": "3",
"properties": [
{
"name": "Label",
"type": "string"
},
{
"name": "id",
"type": "STRING"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>datax-example-streamreader</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-example-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -12,7 +12,7 @@ import org.junit.Test;
public class StreamReader2StreamWriterTest {
@Test
public void testStreamReader2StreamWriter() {
String path = "/job/stream/stream2stream.json";
String path = "/stream2stream.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}

View File

@ -19,6 +19,10 @@
<img src="img/img01.png" alt="img" style="zoom:40%;" />
### 目录结构
该目录结构演示了如何使用datax-example-core编写测试用例和校验代码流程。
<img src="img/img03.png" alt="img" style="zoom:100%;" />
### 实现原理
- 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。他不依赖datax.home,而是依赖ide编译后的target目录
@ -70,46 +74,23 @@
</plugins>
</build>
```
#### 在example模块使用
1.在datax-example模块引入你需要的插件默认只引入了streamreader、writer
2.打开datax-example的Main class
#### 在测试模块模块使用
参考datax-example/datax-example-streamreader的StreamReader2StreamWriterTest.java
```java
public class Main {
/**
* 注意!
* 1.在example模块pom文件添加你依赖的的调试插件
* 你可以直接打开本模块的pom文件,参考是如何引入streamreaderstreamwriter
* 2. 在此处指定你的job文件
*/
public static void main(String[] args) {
String classPathJobPath = "/job/stream2stream.json";
String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath);
startExample(absJobPath);
}
public static void startExample(String jobPath) {
Configuration configuration = ExampleConfigParser.parse(jobPath);
Engine engine = new Engine();
engine.start(configuration);
}
}
```
#### 在reader/writer模块使用
参考neo4jwriter的StreamReader2Neo4jWriterTest
```java
public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest {
private static final int CHANNEL = 5;
private static final int READER_NUM = 10;
//在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题
public class StreamReader2StreamWriterTest {
@Test
public void testStreamReader2StreamWriter() {
String path = "/stream2stream.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
}
}
```
参考datax-example/datax-example-neo4j的StreamReader2Neo4jWriterTest
```java
public class StreamReader2Neo4jWriterTest{
@Test
public void streamReader2Neo4j() {
deleteHistoryIfExist();
@ -122,29 +103,5 @@ public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest {
//根据channel和reader的mock数据校验结果集是否符合预期
verifyWriteResult();
}
private void deleteHistoryIfExist() {
String query = "match (n:StreamReader) return n limit 1";
String delete = "match (n:StreamReader) delete n";
if (super.neo4jSession.run(query).hasNext()) {
neo4jSession.run(delete);
}
}
private void verifyWriteResult() {
int total = CHANNEL * READER_NUM;
String query = "match (n:StreamReader) return n";
Result run = neo4jSession.run(query);
int count = 0;
while (run.hasNext()) {
Record record = run.next();
Node node = record.get("n").asNode();
if (node.hasLabel("StreamReader")) {
count++;
}
}
Assert.assertEquals(count, total);
}
}
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

View File

@ -10,6 +10,12 @@
</parent>
<artifactId>datax-example</artifactId>
<packaging>pom</packaging>
<modules>
<module>datax-example-core</module>
<module>datax-example-streamreader</module>
<module>datax-example-neo4j</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
@ -28,17 +34,6 @@
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- 默认引入streamwriter,streamreader-->
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -1,33 +0,0 @@
package com.alibaba.datax.example.util.com.alibaba.datax.example.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.example.util.ExampleConfigParser;
import com.alibaba.datax.example.util.PathUtil;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
public class ExampleConfigParserTest {
@Test
public void testExampleConfigParserShouldLoadDefaultConf() {
String path = "/job/stream2stream.json";
Configuration testConfiguration = ExampleConfigParser.parse(
PathUtil.getAbsolutePathFromClassPath(path)
);
Configuration defaultConf = loadDefaultConf();
Assert.assertEquals(testConfiguration.get("core"), defaultConf.get("core"));
Assert.assertEquals(testConfiguration.get("common"), defaultConf.get("common"));
}
private Configuration loadDefaultConf() {
return Configuration.from(
new File(PathUtil.getAbsolutePathFromClassPath("/example/conf/core.json")
)
);
}
}

View File

@ -1,60 +0,0 @@
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}

View File

@ -1,36 +0,0 @@
{
"job": {
"content": [
{
"reader": {
"name": "notExistReaderPlugin",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello你好世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

View File

@ -1,36 +0,0 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello你好世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

View File

@ -448,8 +448,7 @@ DataX的内部类型在实现上会选用不同的java类型
4. 根据插件配置中定义的入口类,框架通过反射实例化对应的`Job`和`Task`对象。
### 编写测试用例
1. 你可以在你的插件模块引入datax-example模块,调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md)
2. 你可以在datax-example模块引入你的插件模块编写测试方法调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md)
1. 在datax-example工程下新建新的插件测试模块,调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md)
## 三、Last but not Least

View File

@ -1,59 +0,0 @@
package com.alibaba.datax.plugin.writer.mock;
import com.alibaba.datax.example.ExampleContainer;
import com.alibaba.datax.example.util.PathUtil;
import com.alibaba.datax.plugin.writer.Neo4jWriterTest;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.types.Node;
/**
* 展示如何使用ExampleContainer运行测试用例
* {@code Author} FuYouJ
* {@code Date} 2023/8/6 11:36
*/
public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest {
private static final int CHANNEL = 5;
private static final int READER_NUM = 10;
//在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题
@Test
public void streamReader2Neo4j() {
deleteHistoryIfExist();
String path = "/streamreader2neo4j.json";
String jobPath = PathUtil.getAbsolutePathFromClassPath(path);
ExampleContainer.start(jobPath);
//根据channel和reader的mock数据校验结果集是否符合预期
verifyWriteResult();
}
private void deleteHistoryIfExist() {
String query = "match (n:StreamReader) return n limit 1";
String delete = "match (n:StreamReader) delete n";
if (super.neo4jSession.run(query).hasNext()) {
neo4jSession.run(delete);
}
}
private void verifyWriteResult() {
int total = CHANNEL * READER_NUM;
String query = "match (n:StreamReader) return n";
Result run = neo4jSession.run(query);
int count = 0;
while (run.hasNext()) {
Record record = run.next();
Node node = record.get("n").asNode();
if (node.hasLabel("StreamReader")) {
count++;
}
}
Assert.assertEquals(count, total);
}
}