diff --git a/README.md b/README.md index 54eb2203..315c72ac 100644 --- a/README.md +++ b/README.md @@ -100,10 +100,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N - 整库迁移:https://help.aliyun.com/document_detail/137809.html - 批量上云:https://help.aliyun.com/document_detail/146671.html - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html - - -# 本地快速调试读写插件定位BUG -在example模块可以方便快速的在本地运行任务 -点击:[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md) + - # 我要开发新的插件 diff --git a/datax-example/datax-example-core/pom.xml b/datax-example/datax-example-core/pom.xml new file mode 100644 index 00000000..6a2e9e8e --- /dev/null +++ b/datax-example/datax-example-core/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + com.alibaba.datax + datax-example + 0.0.1-SNAPSHOT + + + datax-example-core + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/datax-example/src/main/java/com/alibaba/datax/example/ExampleContainer.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/ExampleContainer.java similarity index 100% rename from datax-example/src/main/java/com/alibaba/datax/example/ExampleContainer.java rename to datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/ExampleContainer.java diff --git a/datax-example/src/main/java/com/alibaba/datax/example/Main.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/Main.java similarity index 100% rename from datax-example/src/main/java/com/alibaba/datax/example/Main.java rename to datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/Main.java diff --git a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java similarity index 95% rename from datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java rename to datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index 7a24aaf4..6bbb4a23 100644 --- a/datax-example/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -63,9 +63,8 @@ public class ExampleConfigParser { " **/*.*\n" + " \n" + " true\n" + - " \n [可参阅streamreader pom文件] \n" + - "3:If you are using 'datax-example' as the startup module, " + - "check whether the 'example' module has imported the dependencies of the plugin. Refer to the 'pom' file of the 'example' module"; + " \n [Refer to the streamreader pom file] \n" + + "3: Check that the datax-yourPlugin-example module imported your test plugin"; message = String.format(message, failedPlugin); throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, message); } diff --git a/datax-example/src/main/java/com/alibaba/datax/example/util/PathUtil.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/PathUtil.java similarity index 100% rename from datax-example/src/main/java/com/alibaba/datax/example/util/PathUtil.java rename to datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/PathUtil.java diff --git a/datax-example/src/main/resources/example/conf/core.json b/datax-example/datax-example-core/src/main/resources/example/conf/core.json similarity index 100% rename from datax-example/src/main/resources/example/conf/core.json rename to datax-example/datax-example-core/src/main/resources/example/conf/core.json diff --git a/datax-example/datax-example-core/src/test/java/com/alibaba/datax/example/util/PathUtilTest.java b/datax-example/datax-example-core/src/test/java/com/alibaba/datax/example/util/PathUtilTest.java new file mode 100644 index 00000000..8985b54c --- /dev/null +++ b/datax-example/datax-example-core/src/test/java/com/alibaba/datax/example/util/PathUtilTest.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.example.util; + +import org.junit.Assert; +import org.junit.Test; + +/** + * {@code Author} FuYouJ + * {@code Date} 2023/8/19 21:38 + */ + +public class PathUtilTest { + + @Test + public void testParseClassPathFile() { + String path = "/pathTest.json"; + String absolutePathFromClassPath = PathUtil.getAbsolutePathFromClassPath(path); + Assert.assertNotNull(absolutePathFromClassPath); + } +} diff --git a/datax-example/datax-example-core/src/test/resources/pathTest.json b/datax-example/datax-example-core/src/test/resources/pathTest.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/datax-example/datax-example-core/src/test/resources/pathTest.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/datax-example/datax-example-neo4j/pom.xml b/datax-example/datax-example-neo4j/pom.xml new file mode 100644 index 00000000..303b14a8 --- /dev/null +++ b/datax-example/datax-example-neo4j/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + com.alibaba.datax + datax-example + 0.0.1-SNAPSHOT + + + datax-example-neo4j + + + 8 + 8 + UTF-8 + 1.17.6 + 4.4.9 + + + + com.alibaba.datax + datax-example-core + 0.0.1-SNAPSHOT + + + org.testcontainers + testcontainers + ${test.container.version} + + + com.alibaba.datax + neo4jwriter + 0.0.1-SNAPSHOT + + + com.alibaba.datax + datax-example-streamreader + 0.0.1-SNAPSHOT + + + \ No newline at end of file diff --git a/datax-example/datax-example-neo4j/src/test/java/com/alibaba/datax/example/neo4j/StreamReader2Neo4jWriterTest.java b/datax-example/datax-example-neo4j/src/test/java/com/alibaba/datax/example/neo4j/StreamReader2Neo4jWriterTest.java new file mode 100644 index 00000000..9cf01253 --- /dev/null +++ b/datax-example/datax-example-neo4j/src/test/java/com/alibaba/datax/example/neo4j/StreamReader2Neo4jWriterTest.java @@ -0,0 +1,138 @@ +package com.alibaba.datax.example.neo4j; + +import com.alibaba.datax.example.ExampleContainer; +import com.alibaba.datax.example.util.PathUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.driver.*; +import org.neo4j.driver.types.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +/** + * {@code Author} FuYouJ + * {@code Date} 2023/8/19 21:48 + */ + +public class StreamReader2Neo4jWriterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader2Neo4jWriterTest.class); + private static final String CONTAINER_IMAGE = "neo4j:5.9.0"; + + private static final String CONTAINER_HOST = "neo4j-host"; + private static final int HTTP_PORT = 7474; + private static final int BOLT_PORT = 7687; + private static final String CONTAINER_NEO4J_USERNAME = "neo4j"; + private static final String CONTAINER_NEO4J_PASSWORD = "Test@12343"; + private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + BOLT_PORT); + + protected static final Network NETWORK = Network.newNetwork(); + + private GenericContainer container; + protected Driver neo4jDriver; + protected Session neo4jSession; + private static final int CHANNEL = 5; + private static final int READER_NUM = 10; + + @Before + public void init() { + DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE); + container = + new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(CONTAINER_HOST) + .withExposedPorts(HTTP_PORT, BOLT_PORT) + .withEnv( + "NEO4J_AUTH", + CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD) + .withEnv("apoc.export.file.enabled", "true") + .withEnv("apoc.import.file.enabled", "true") + .withEnv("apoc.import.file.use_neo4j_config", "true") + .withEnv("NEO4J_PLUGINS", "[\"apoc\"]") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CONTAINER_IMAGE))); + container.setPortBindings( + Arrays.asList( + String.format("%s:%s", HTTP_PORT, HTTP_PORT), + String.format("%s:%s", BOLT_PORT, BOLT_PORT))); + Startables.deepStart(Stream.of(container)).join(); + LOGGER.info("container started"); + Awaitility.given() + .ignoreExceptions() + .await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + } + + //在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题 + @Test + public void streamReader2Neo4j() { + + deleteHistoryIfExist(); + + String path = "/streamreader2neo4j.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + + ExampleContainer.start(jobPath); + + //根据channel和reader的mock数据,校验结果集是否符合预期 + verifyWriteResult(); + } + + private void deleteHistoryIfExist() { + String query = "match (n:StreamReader) return n limit 1"; + String delete = "match (n:StreamReader) delete n"; + if (neo4jSession.run(query).hasNext()) { + neo4jSession.run(delete); + } + } + + private void verifyWriteResult() { + int total = CHANNEL * READER_NUM; + String query = "match (n:StreamReader) return n"; + Result run = neo4jSession.run(query); + int count = 0; + while (run.hasNext()) { + Record record = run.next(); + Node node = record.get("n").asNode(); + if (node.hasLabel("StreamReader")) { + count++; + } + } + Assert.assertEquals(count, total); + } + @After + public void destroy() { + if (neo4jSession != null) { + neo4jSession.close(); + } + if (neo4jDriver != null) { + neo4jDriver.close(); + } + if (container != null) { + container.close(); + } + } + + private void initConnection() { + neo4jDriver = + GraphDatabase.driver( + CONTAINER_URI, + AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD)); + neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j")); + } +} diff --git a/datax-example/datax-example-neo4j/src/test/resources/streamreader2neo4j.json b/datax-example/datax-example-neo4j/src/test/resources/streamreader2neo4j.json new file mode 100644 index 00000000..3d543ce3 --- /dev/null +++ b/datax-example/datax-example-neo4j/src/test/resources/streamreader2neo4j.json @@ -0,0 +1,51 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "sliceRecordCount": 10, + "column": [ + { + "type": "string", + "value": "StreamReader" + }, + { + "type": "string", + "value": "1997" + } + ] + } + }, + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batchSize": "3", + "properties": [ + { + "name": "Label", + "type": "string" + }, + { + "name": "id", + "type": "STRING" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} \ No newline at end of file diff --git a/datax-example/datax-example-streamreader/pom.xml b/datax-example/datax-example-streamreader/pom.xml new file mode 100644 index 00000000..ea70de10 --- /dev/null +++ b/datax-example/datax-example-streamreader/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.alibaba.datax + datax-example + 0.0.1-SNAPSHOT + + + datax-example-streamreader + + + 8 + 8 + UTF-8 + + + + com.alibaba.datax + datax-example-core + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamreader + 0.0.1-SNAPSHOT + + + com.alibaba.datax + streamwriter + 0.0.1-SNAPSHOT + + + + \ No newline at end of file diff --git a/datax-example/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java b/datax-example/datax-example-streamreader/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java similarity index 89% rename from datax-example/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java rename to datax-example/datax-example-streamreader/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java index 8081677e..71d083d0 100644 --- a/datax-example/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java +++ b/datax-example/datax-example-streamreader/src/test/java/com/alibaba/datax/example/streamreader/StreamReader2StreamWriterTest.java @@ -12,7 +12,7 @@ import org.junit.Test; public class StreamReader2StreamWriterTest { @Test public void testStreamReader2StreamWriter() { - String path = "/job/stream/stream2stream.json"; + String path = "/stream2stream.json"; String jobPath = PathUtil.getAbsolutePathFromClassPath(path); ExampleContainer.start(jobPath); } diff --git a/datax-example/src/main/resources/job/stream2stream.json b/datax-example/datax-example-streamreader/src/test/resources/stream2stream.json similarity index 100% rename from datax-example/src/main/resources/job/stream2stream.json rename to datax-example/datax-example-streamreader/src/test/resources/stream2stream.json diff --git a/datax-example/doc/README.md b/datax-example/doc/README.md index d44e10d2..15f77e87 100644 --- a/datax-example/doc/README.md +++ b/datax-example/doc/README.md @@ -19,6 +19,10 @@ img +### 目录结构 +该目录结构演示了如何使用datax-example-core编写测试用例,和校验代码流程。 +img + ### 实现原理 - 不修改原有的ConfigParer,使用新的ExampleConfigParser,仅用于example模块。他不依赖datax.home,而是依赖ide编译后的target目录 @@ -70,46 +74,23 @@ ``` -#### 在example模块使用 -1.在datax-example模块引入你需要的插件,默认只引入了streamreader、writer - -2.打开datax-example的Main class - +#### 在测试模块模块使用 +参考datax-example/datax-example-streamreader的StreamReader2StreamWriterTest.java ```java -public class Main { - - /** - * 注意! - * 1.在example模块pom文件添加你依赖的的调试插件, - * 你可以直接打开本模块的pom文件,参考是如何引入streamreader,streamwriter - * 2. 在此处指定你的job文件 - */ - public static void main(String[] args) { - - String classPathJobPath = "/job/stream2stream.json"; - String absJobPath = PathUtil.getAbsolutePathFromClassPath(classPathJobPath); - startExample(absJobPath); - } - - public static void startExample(String jobPath) { - - Configuration configuration = ExampleConfigParser.parse(jobPath); - - Engine engine = new Engine(); - engine.start(configuration); - } - -} -``` -#### 在reader/writer模块使用 -参考neo4jwriter的StreamReader2Neo4jWriterTest -```java -public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest { - private static final int CHANNEL = 5; - private static final int READER_NUM = 10; - - //在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题 +public class StreamReader2StreamWriterTest { @Test + public void testStreamReader2StreamWriter() { + String path = "/stream2stream.json"; + String jobPath = PathUtil.getAbsolutePathFromClassPath(path); + ExampleContainer.start(jobPath); + } +} + +``` +参考datax-example/datax-example-neo4j的StreamReader2Neo4jWriterTest +```java +public class StreamReader2Neo4jWriterTest{ +@Test public void streamReader2Neo4j() { deleteHistoryIfExist(); @@ -122,29 +103,5 @@ public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest { //根据channel和reader的mock数据,校验结果集是否符合预期 verifyWriteResult(); } - - private void deleteHistoryIfExist() { - String query = "match (n:StreamReader) return n limit 1"; - String delete = "match (n:StreamReader) delete n"; - if (super.neo4jSession.run(query).hasNext()) { - neo4jSession.run(delete); - } - } - - private void verifyWriteResult() { - int total = CHANNEL * READER_NUM; - String query = "match (n:StreamReader) return n"; - Result run = neo4jSession.run(query); - int count = 0; - while (run.hasNext()) { - Record record = run.next(); - Node node = record.get("n").asNode(); - if (node.hasLabel("StreamReader")) { - count++; - } - } - Assert.assertEquals(count, total); - } } - ``` \ No newline at end of file diff --git a/datax-example/doc/img/img03.png b/datax-example/doc/img/img03.png new file mode 100644 index 00000000..731f81bd Binary files /dev/null and b/datax-example/doc/img/img03.png differ diff --git a/datax-example/pom.xml b/datax-example/pom.xml index 17bb9e18..9c4c9200 100644 --- a/datax-example/pom.xml +++ b/datax-example/pom.xml @@ -10,6 +10,12 @@ datax-example + pom + + datax-example-core + datax-example-streamreader + datax-example-neo4j + 8 @@ -28,17 +34,6 @@ datax-core 0.0.1-SNAPSHOT - - - com.alibaba.datax - streamwriter - 0.0.1-SNAPSHOT - - - com.alibaba.datax - streamreader - 0.0.1-SNAPSHOT - junit junit diff --git a/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java b/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java deleted file mode 100644 index 5a99e7b1..00000000 --- a/datax-example/src/test/java/com/alibaba/datax/example/util/com/alibaba/datax/example/util/ExampleConfigParserTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.alibaba.datax.example.util.com.alibaba.datax.example.util; - -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.example.util.ExampleConfigParser; -import com.alibaba.datax.example.util.PathUtil; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; - - -public class ExampleConfigParserTest { - - - @Test - public void testExampleConfigParserShouldLoadDefaultConf() { - - String path = "/job/stream2stream.json"; - Configuration testConfiguration = ExampleConfigParser.parse( - PathUtil.getAbsolutePathFromClassPath(path) - ); - Configuration defaultConf = loadDefaultConf(); - Assert.assertEquals(testConfiguration.get("core"), defaultConf.get("core")); - Assert.assertEquals(testConfiguration.get("common"), defaultConf.get("common")); - } - - private Configuration loadDefaultConf() { - return Configuration.from( - new File(PathUtil.getAbsolutePathFromClassPath("/example/conf/core.json") - ) - ); - } -} diff --git a/datax-example/src/test/resources/example/conf/core.json b/datax-example/src/test/resources/example/conf/core.json deleted file mode 100755 index 33281ac0..00000000 --- a/datax-example/src/test/resources/example/conf/core.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "entry": { - "jvm": "-Xms1G -Xmx1G", - "environment": {} - }, - "common": { - "column": { - "datetimeFormat": "yyyy-MM-dd HH:mm:ss", - "timeFormat": "HH:mm:ss", - "dateFormat": "yyyy-MM-dd", - "extraFormats":["yyyyMMdd"], - "timeZone": "GMT+8", - "encoding": "utf-8" - } - }, - "core": { - "dataXServer": { - "address": "http://localhost:7001/api", - "timeout": 10000, - "reportDataxLog": false, - "reportPerfLog": false - }, - "transport": { - "channel": { - "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", - "speed": { - "byte": -1, - "record": -1 - }, - "flowControlInterval": 20, - "capacity": 512, - "byteCapacity": 67108864 - }, - "exchanger": { - "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", - "bufferSize": 32 - } - }, - "container": { - "job": { - "reportInterval": 10000 - }, - "taskGroup": { - "channel": 5 - }, - "trace": { - "enable": "false" - } - - }, - "statistics": { - "collector": { - "plugin": { - "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector", - "maxDirtyNumber": 10 - } - } - } - } -} diff --git a/datax-example/src/test/resources/job/notExistPluginTest.json b/datax-example/src/test/resources/job/notExistPluginTest.json deleted file mode 100644 index afefaad3..00000000 --- a/datax-example/src/test/resources/job/notExistPluginTest.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "job": { - "content": [ - { - "reader": { - "name": "notExistReaderPlugin", - "parameter": { - "sliceRecordCount": 10, - "column": [ - { - "type": "long", - "value": "10" - }, - { - "type": "string", - "value": "hello,你好,世界-DataX" - } - ] - } - }, - "writer": { - "name": "streamwriter", - "parameter": { - "encoding": "UTF-8", - "print": true - } - } - } - ], - "setting": { - "speed": { - "channel": 5 - } - } - } -} \ No newline at end of file diff --git a/datax-example/src/test/resources/job/stream/stream2stream.json b/datax-example/src/test/resources/job/stream/stream2stream.json deleted file mode 100644 index b2a57395..00000000 --- a/datax-example/src/test/resources/job/stream/stream2stream.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "job": { - "content": [ - { - "reader": { - "name": "streamreader", - "parameter": { - "sliceRecordCount": 10, - "column": [ - { - "type": "long", - "value": "10" - }, - { - "type": "string", - "value": "hello,你好,世界-DataX" - } - ] - } - }, - "writer": { - "name": "streamwriter", - "parameter": { - "encoding": "UTF-8", - "print": true - } - } - } - ], - "setting": { - "speed": { - "channel": 5 - } - } - } -} \ No newline at end of file diff --git a/dataxPluginDev.md b/dataxPluginDev.md index 98aa41a3..098bf819 100644 --- a/dataxPluginDev.md +++ b/dataxPluginDev.md @@ -448,8 +448,7 @@ DataX的内部类型在实现上会选用不同的java类型: 4. 根据插件配置中定义的入口类,框架通过反射实例化对应的`Job`和`Task`对象。 ### 编写测试用例 -1. 你可以在你的插件模块引入datax-example模块,调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md) -2. 你可以在datax-example模块引入你的插件模块,编写测试方法调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md) +1. 在datax-example工程下新建新的插件测试模块,调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/datax-example/doc/README.md) ## 三、Last but not Least diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java deleted file mode 100644 index 3335f5e7..00000000 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/StreamReader2Neo4jWriterTest.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.alibaba.datax.plugin.writer.mock; - -import com.alibaba.datax.example.ExampleContainer; -import com.alibaba.datax.example.util.PathUtil; -import com.alibaba.datax.plugin.writer.Neo4jWriterTest; -import org.junit.Assert; -import org.junit.Test; -import org.neo4j.driver.Record; -import org.neo4j.driver.Result; -import org.neo4j.driver.types.Node; - -/** - * 展示如何使用ExampleContainer运行测试用例 - * {@code Author} FuYouJ - * {@code Date} 2023/8/6 11:36 - */ - -public class StreamReader2Neo4jWriterTest extends Neo4jWriterTest { - private static final int CHANNEL = 5; - private static final int READER_NUM = 10; - - //在neo4jWriter模块使用Example测试整个job,方便发现整个流程的代码问题 - @Test - public void streamReader2Neo4j() { - - deleteHistoryIfExist(); - - String path = "/streamreader2neo4j.json"; - String jobPath = PathUtil.getAbsolutePathFromClassPath(path); - - ExampleContainer.start(jobPath); - - //根据channel和reader的mock数据,校验结果集是否符合预期 - verifyWriteResult(); - } - - private void deleteHistoryIfExist() { - String query = "match (n:StreamReader) return n limit 1"; - String delete = "match (n:StreamReader) delete n"; - if (super.neo4jSession.run(query).hasNext()) { - neo4jSession.run(delete); - } - } - - private void verifyWriteResult() { - int total = CHANNEL * READER_NUM; - String query = "match (n:StreamReader) return n"; - Result run = neo4jSession.run(query); - int count = 0; - while (run.hasNext()) { - Record record = run.next(); - Node node = record.get("n").asNode(); - if (node.hasLabel("StreamReader")) { - count++; - } - } - Assert.assertEquals(count, total); - } -}