feat: streamjob api add in sreworks-job

This commit is contained in:
twwy 2023-04-11 15:19:14 +08:00
parent d9247c5fca
commit 753bb1824d
15 changed files with 584 additions and 182 deletions

52
contribute-to-sreworks.sh Executable file
View File

@ -0,0 +1,52 @@
#!/bin/bash
#
# sreworks_project_path/contribute-to-sreworks.sh "paas/appmanager"
#
SW_ROOT=$(cd `dirname $0`; pwd)
IS_GIT_ROOT=$(ls -l .git|wc -l|awk '{print $1}')
if [ "$IS_GIT_ROOT" = "0" ];then
echo ""
echo "Please run contribute-to-sreworks.sh in Git Project root path /"
exit 1
fi
# 判断目标路径是否存在
TARGET_PATH=$1
if [ ! -d ${SW_ROOT}/${TARGET_PATH} ];then
echo "Target app path not found"
exit 1
fi
# 只能拷贝到paas/saas这两个目录下
if [[ "$TARGET_PATH" =~ ^paas/.* ]] || [[ "$TARGET_PATH" =~ ^saas/.* ]] || [[ "$TARGET_PATH" =~ ^/paas/.* ]] || [[ "$TARGET_PATH" =~ ^/saas/.* ]]; then
echo "Path check ok"
else
echo "Please copy code to paas/* or saas/*"
echo ""
echo "List paas/"
ls -l ${SW_ROOT}/${TARGET_PATH}/paas/|grep -v "total "
echo ""
echo "List saas/"
ls -l ${SW_ROOT}/${TARGET_PATH}/saas/|grep -v "total "
exit 1
fi
# 将当前代码拷贝到一个临时目录,移除.git文件
rm -rf /tmp/tmp_sw_project
mkdir -p /tmp/tmp_sw_project
cp -r ./ /tmp/tmp_sw_project/
rm -rf /tmp/tmp_sw_project/.git
mv /tmp/tmp_sw_project ${SW_ROOT}/${TARGET_PATH}/../
mv ${SW_ROOT}/${TARGET_PATH}/../tmp_sw_project ${SW_ROOT}/${TARGET_PATH}.bak
rm -rf ${SW_ROOT}/${TARGET_PATH}
mv ${SW_ROOT}/${TARGET_PATH}.bak ${SW_ROOT}/${TARGET_PATH}
echo "Copy code ok"

View File

@ -2,17 +2,8 @@ package com.alibaba.sreworks.job.master.controllers;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksJob; import com.alibaba.sreworks.job.master.domain.DTO.*;
import com.alibaba.sreworks.job.master.domain.DTO.ElasticJobInstanceDTO;
import com.alibaba.sreworks.job.master.domain.DTO.FlinkConnectorDTO;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksJobDTO;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksStreamJobDTO;
import com.alibaba.sreworks.job.master.domain.repository.SreworksJobRepository;
import com.alibaba.sreworks.job.master.jobscene.JobSceneService;
import com.alibaba.sreworks.job.master.jobschedule.JobScheduleService;
import com.alibaba.sreworks.job.master.jobtrigger.JobTriggerService;
import com.alibaba.sreworks.job.master.params.*; import com.alibaba.sreworks.job.master.params.*;
import com.alibaba.sreworks.job.master.services.JobService;
import com.alibaba.sreworks.job.master.services.StreamJobService; import com.alibaba.sreworks.job.master.services.StreamJobService;
import com.alibaba.sreworks.job.master.services.VvpService; import com.alibaba.sreworks.job.master.services.VvpService;
import com.alibaba.sreworks.job.utils.JsonUtil; import com.alibaba.sreworks.job.utils.JsonUtil;
@ -22,15 +13,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static com.alibaba.sreworks.job.utils.PageUtil.pageable; import static com.alibaba.sreworks.job.utils.PageUtil.pageable;
@ -52,9 +37,6 @@ public class StreamJobController extends BaseController {
public TeslaBaseResult create(@RequestBody StreamJobCreateParam param) throws Exception { public TeslaBaseResult create(@RequestBody StreamJobCreateParam param) throws Exception {
param.setCreator(getUserEmployeeId()); param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId()); param.setOperator(getUserEmployeeId());
if (param.getAppId() == null) {
param.setAppId(getAppId());
}
if (param.getTags() == null) { if (param.getTags() == null) {
param.setTags(new JSONArray()); param.setTags(new JSONArray());
} }
@ -67,6 +49,72 @@ public class StreamJobController extends BaseController {
return buildSucceedResult(streamJobService.create(param)); return buildSucceedResult(streamJobService.create(param));
} }
@RequestMapping(value = "{id}/source", method = RequestMethod.POST)
public TeslaBaseResult addSource(@PathVariable("id") Long streamJobId, @RequestBody StreamJobSourceCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addSource(streamJobId, job.getAppId(), param));
}
@RequestMapping(value = "{id}/block/{blockId}", method = RequestMethod.DELETE)
public TeslaBaseResult deleteBlock(@PathVariable("id") Long streamJobId, @PathVariable("blockId") Long streamJobBlockId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
streamJobService.deleteBlock(streamJobBlockId);
return buildSucceedResult(JsonUtil.map(
"streamJobId", streamJobId,
"blockId", streamJobBlockId
));
}
@RequestMapping(value = "{id}/sink", method = RequestMethod.POST)
public TeslaBaseResult addSink(@PathVariable("id") Long streamJobId, @RequestBody StreamJobSinkCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addSink(streamJobId, job.getAppId(), param));
}
@RequestMapping(value = "{id}/python", method = RequestMethod.POST)
public TeslaBaseResult addPython(@PathVariable("id") Long streamJobId, @RequestBody StreamJobPythonCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addPython(streamJobId, job.getAppId(), param));
}
@RequestMapping(value = "{id}/blocks", method = RequestMethod.GET)
public TeslaBaseResult getBlocks(@PathVariable("id") Long streamJobId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
return buildSucceedResult(streamJobService.listBlockByStreamJobId(streamJobId));
}
@RequestMapping(value = "{id}/preview", method = RequestMethod.GET)
public TeslaBaseResult getPreview(@PathVariable("id") Long streamJobId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
return buildSucceedResult(JsonUtil.map(
"content", streamJobService.generateScript(streamJobId)
));
}
@RequestMapping(value = "gets", method = RequestMethod.GET) @RequestMapping(value = "gets", method = RequestMethod.GET)
public TeslaBaseResult gets(Integer page, Integer pageSize) throws Exception { public TeslaBaseResult gets(Integer page, Integer pageSize) throws Exception {
Page<SreworksStreamJobDTO> jobList = streamJobService.gets(pageable(page, pageSize)); Page<SreworksStreamJobDTO> jobList = streamJobService.gets(pageable(page, pageSize));
@ -74,7 +122,7 @@ public class StreamJobController extends BaseController {
"page", jobList.getNumber(), "page", jobList.getNumber(),
"pageSize", jobList.getSize(), "pageSize", jobList.getSize(),
"total", jobList.getTotalElements(), "total", jobList.getTotalElements(),
"items", jobList.getContent() "items", jobList.getContent()
)); ));
} }

View File

@ -9,9 +9,6 @@ import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*; import javax.persistence.*;
/**
* @author jinghua.yjh
*/
@Slf4j @Slf4j
@Entity @Entity
@EntityListeners(AuditingEntityListener.class) @EntityListeners(AuditingEntityListener.class)
@ -19,7 +16,7 @@ import javax.persistence.*;
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class SreworksStreamJobPython { public class SreworksStreamJobBlock {
@Id @Id
@GeneratedValue @GeneratedValue
@ -47,6 +44,10 @@ public class SreworksStreamJobPython {
private String name; private String name;
@Column(columnDefinition = "longtext") @Column(columnDefinition = "longtext")
private String script; private String data;
@Column
private String blockType;
} }

View File

@ -1,52 +0,0 @@
package com.alibaba.sreworks.job.master.domain.DO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
/**
* @author jinghua.yjh
*/
@Slf4j
@Entity
@EntityListeners(AuditingEntityListener.class)
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SreworksStreamJobSink {
@Id
@GeneratedValue
private Long id;
@Column
private Long streamJobId;
@Column
private Long gmtCreate;
@Column
private Long gmtModified;
@Column
private String creator;
@Column
private String operator;
@Column
private String appId;
@Column
private String name;
@Column(columnDefinition = "longtext")
private String options;
}

View File

@ -1,52 +0,0 @@
package com.alibaba.sreworks.job.master.domain.DO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
/**
* @author jinghua.yjh
*/
@Slf4j
@Entity
@EntityListeners(AuditingEntityListener.class)
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SreworksStreamJobSource {
@Id
@GeneratedValue
private Long id;
@Column
private Long streamJobId;
@Column
private Long gmtCreate;
@Column
private Long gmtModified;
@Column
private String creator;
@Column
private String operator;
@Column
private String appId;
@Column
private String name;
@Column(columnDefinition = "longtext")
private String options;
}

View File

@ -0,0 +1,61 @@
package com.alibaba.sreworks.job.master.domain.DTO;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class SreworksStreamJobBlockDTO {
private Long id;
private Long streamJobId;
private Long gmtCreate;
private Long gmtModified;
private String creator;
private String operator;
private String appId;
private String name;
private String blockType;
private String blockTypeDisplay;
private JSONObject data;
public SreworksStreamJobBlockDTO(SreworksStreamJobBlock jobBlock) {
id = jobBlock.getId();
gmtCreate = jobBlock.getGmtCreate();
gmtModified = jobBlock.getGmtModified();
streamJobId = jobBlock.getStreamJobId();
appId = jobBlock.getAppId();
name = jobBlock.getName();
blockType = jobBlock.getBlockType();
data = JSONObject.parseObject(jobBlock.getData());
if (StringUtils.equals(blockType, "source") && data.getString("sourceType") != null) {
blockTypeDisplay = "输入源:" + data.getString("sourceType");
} else if (StringUtils.equals(blockType, "sink") && data.getString("sinkType") != null) {
blockTypeDisplay = "输出:" + data.getString("sinkType");
} else if (StringUtils.equals(blockType, "python")){
blockTypeDisplay = "Python处理";
} else {
blockTypeDisplay = blockType;
}
}
}

View File

@ -0,0 +1,49 @@
package com.alibaba.sreworks.job.master.domain.DTO;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class SreworksStreamJobSinkDTO {
private Long id;
private Long streamJobId;
private Long gmtCreate;
private Long gmtModified;
private String creator;
private String operator;
private String appId;
private String name;
private JSONArray options;
private JSONArray columns;
public SreworksStreamJobSinkDTO(SreworksStreamJobBlock jobBlock) {
id = jobBlock.getId();
gmtCreate = jobBlock.getGmtCreate();
gmtModified = jobBlock.getGmtModified();
streamJobId = jobBlock.getStreamJobId();
appId = jobBlock.getAppId();
name = jobBlock.getName();
JSONObject data = JSONObject.parseObject(jobBlock.getData());
options = data.getJSONArray("options");
columns = data.getJSONArray("columns");
}
}

View File

@ -0,0 +1,49 @@
package com.alibaba.sreworks.job.master.domain.DTO;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class SreworksStreamJobSourceDTO {
private Long id;
private Long streamJobId;
private Long gmtCreate;
private Long gmtModified;
private String creator;
private String operator;
private String appId;
private String name;
private JSONArray options;
private JSONArray columns;
public SreworksStreamJobSourceDTO(SreworksStreamJobBlock jobBlock) {
id = jobBlock.getId();
gmtCreate = jobBlock.getGmtCreate();
gmtModified = jobBlock.getGmtModified();
streamJobId = jobBlock.getStreamJobId();
appId = jobBlock.getAppId();
name = jobBlock.getName();
JSONObject data = JSONObject.parseObject(jobBlock.getData());
options = data.getJSONArray("options");
columns = data.getJSONArray("columns");
}
}

View File

@ -0,0 +1,20 @@
package com.alibaba.sreworks.job.master.domain.repository;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import java.util.List;
public interface SreworksStreamJobBlockRepository
extends JpaRepository<SreworksStreamJobBlock, Long>, JpaSpecificationExecutor<SreworksStreamJobBlock> {
SreworksStreamJobBlock findFirstById(Long id);
Page<SreworksStreamJobBlock> findAll(Pageable pageable);
List<SreworksStreamJobBlock> findAllByStreamJobId(Long streamJobId);
}

View File

@ -0,0 +1,47 @@
package com.alibaba.sreworks.job.master.params;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import com.alibaba.sreworks.job.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StreamJobPythonCreateParam {
private String creator;
private String operator;
private String scriptName;
private String scriptContent;
private JSONArray options;
public SreworksStreamJobBlock init(Long streamJobId, String appId) {
return SreworksStreamJobBlock.builder()
.gmtCreate(System.currentTimeMillis())
.gmtModified(System.currentTimeMillis())
.creator(getCreator())
.blockType("python")
.data(JsonUtil.map(
"options", getOptions(),
"content", getScriptName()
).toJSONString())
.operator(getOperator())
.appId(appId)
.name(getScriptName())
.streamJobId(streamJobId)
.build();
}
}

View File

@ -0,0 +1,50 @@
package com.alibaba.sreworks.job.master.params;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import com.alibaba.sreworks.job.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StreamJobSinkCreateParam {
private String creator;
private String operator;
private String sinkName;
private JSONArray options;
private JSONArray columns;
private String type;
public SreworksStreamJobBlock init(Long streamJobId, String appId) {
return SreworksStreamJobBlock.builder()
.gmtCreate(System.currentTimeMillis())
.gmtModified(System.currentTimeMillis())
.creator(getCreator())
.blockType("sink")
.data(JsonUtil.map(
"options", getOptions(),
"columns", getColumns(),
"sinkType", getType()
).toJSONString())
.operator(getOperator())
.appId(appId)
.name(getSinkName())
.streamJobId(streamJobId)
.build();
}
}

View File

@ -0,0 +1,50 @@
package com.alibaba.sreworks.job.master.params;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import com.alibaba.sreworks.job.utils.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StreamJobSourceCreateParam {
private String creator;
private String operator;
private String sourceName;
private JSONArray options;
private JSONArray columns;
private String type;
public SreworksStreamJobBlock init(Long streamJobId, String appId) {
return SreworksStreamJobBlock.builder()
.gmtCreate(System.currentTimeMillis())
.gmtModified(System.currentTimeMillis())
.creator(getCreator())
.blockType("source")
.data(JsonUtil.map(
"options", getOptions(),
"columns", getColumns(),
"sourceType", getType()
).toJSONString())
.operator(getOperator())
.appId(appId)
.name(getSourceName())
.streamJobId(streamJobId)
.build();
}
}

View File

@ -1,24 +1,15 @@
package com.alibaba.sreworks.job.master.services; package com.alibaba.sreworks.job.master.services;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.common.JobTriggerType; import com.alibaba.sreworks.job.master.common.JobTriggerType;
import com.alibaba.sreworks.job.master.domain.DO.ElasticJobInstance; import com.alibaba.sreworks.job.master.domain.DO.*;
import com.alibaba.sreworks.job.master.domain.DO.SreworksJob; import com.alibaba.sreworks.job.master.domain.DTO.*;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJob; import com.alibaba.sreworks.job.master.domain.repository.*;
import com.alibaba.sreworks.job.master.domain.DTO.JobInstanceStatus;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksJobDTO;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksStreamJobDTO;
import com.alibaba.sreworks.job.master.domain.repository.ElasticJobInstanceRepository;
import com.alibaba.sreworks.job.master.domain.repository.SreworksJobRepository;
import com.alibaba.sreworks.job.master.domain.repository.SreworksJobTaskRepository;
import com.alibaba.sreworks.job.master.domain.repository.SreworksStreamJobRepository;
import com.alibaba.sreworks.job.master.jobscene.JobSceneService; import com.alibaba.sreworks.job.master.jobscene.JobSceneService;
import com.alibaba.sreworks.job.master.jobschedule.JobScheduleService; import com.alibaba.sreworks.job.master.jobschedule.JobScheduleService;
import com.alibaba.sreworks.job.master.jobtrigger.JobTriggerService; import com.alibaba.sreworks.job.master.jobtrigger.JobTriggerService;
import com.alibaba.sreworks.job.master.params.JobCreateParam; import com.alibaba.sreworks.job.master.params.*;
import com.alibaba.sreworks.job.master.params.JobEditScheduleParam;
import com.alibaba.sreworks.job.master.params.JobModifyParam;
import com.alibaba.sreworks.job.master.params.StreamJobCreateParam;
import com.alibaba.sreworks.job.utils.StringUtil; import com.alibaba.sreworks.job.utils.StringUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -30,6 +21,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -40,55 +32,142 @@ public class StreamJobService {
@Autowired @Autowired
SreworksStreamJobRepository streamJobRepository; SreworksStreamJobRepository streamJobRepository;
@Autowired
SreworksStreamJobBlockRepository sreworksStreamJobBlockRepository;
public Page<SreworksStreamJobDTO> gets(Pageable pageable) throws Exception { public Page<SreworksStreamJobDTO> gets(Pageable pageable) throws Exception {
Page<SreworksStreamJob> jobs = streamJobRepository.findAll(pageable); Page<SreworksStreamJob> jobs = streamJobRepository.findAll(pageable);
return jobs.map(streamJob -> { return jobs.map(streamJob -> {
SreworksStreamJobDTO jobDTO = new SreworksStreamJobDTO(streamJob); SreworksStreamJobDTO jobDTO = new SreworksStreamJobDTO(streamJob);
BeanUtils.copyProperties(streamJob, jobDTO);
return jobDTO; return jobDTO;
}); });
} }
// public SreworksJobDTO get(Long id) { public SreworksStreamJobDTO get(Long streamJobId) throws Exception {
//// SreworksJob job = jobRepository.findFirstById(id); SreworksStreamJob job = streamJobRepository.findFirstById(streamJobId);
//// if (job == null) { return new SreworksStreamJobDTO(job);
//// throw new Exception("id is not exists");
//// }
//// SreworksJobDTO jobDTO = new SreworksJobDTO(job);
//// if (!StringUtil.isEmpty(job.getSceneType())) {
//// jobDTO.setSceneConf(jobSceneService.getJobScene(job.getSceneType()).getConf(job.getId()));
//// }
//// if (!StringUtil.isEmpty(job.getScheduleType())) {
//// jobDTO.setScheduleConf(jobScheduleService.getJobSchedule(job.getScheduleType()).getConf(job.getId()));
//// }
//// if (!StringUtil.isEmpty(job.getTriggerType()) && job.getTriggerType().equals(JobTriggerType.CRON.getType())) {
//// JSONObject triggerConf = jobDTO.getTriggerConf();
//// triggerConf.put("enabled", jobTriggerService.getJobTrigger(job.getTriggerType()).getState(id));
//// jobDTO.setTriggerConf(triggerConf);
//// }
//// return jobDTO;
// }
public SreworksStreamJob create(StreamJobCreateParam param) throws Exception {
SreworksStreamJob job = param.job();
job = streamJobRepository.saveAndFlush(job);
return job;
} }
public SreworksStreamJobDTO create(StreamJobCreateParam param) throws Exception {
SreworksStreamJob job = param.job();
job = streamJobRepository.saveAndFlush(job);
return new SreworksStreamJobDTO(job);
}
private void deleteJob(SreworksJob job) throws Exception { public void deleteBlock(Long streamJobBlockId) {
// if (job.getSceneType() != null) { sreworksStreamJobBlockRepository.deleteById(streamJobBlockId);
// jobSceneService.getJobScene(job.getSceneType()).delete(job.getId()); }
// }
// if (job.getScheduleType() != null) { public String generateScript(Long streamJobId) throws Exception {
// jobScheduleService.getJobSchedule(job.getScheduleType()).delete(job.getId()); List<SreworksStreamJobBlockDTO> blocks = this.listBlockByStreamJobId(streamJobId);
// } StringBuilder scriptContent = new StringBuilder(
// if (job.getTriggerType() != null) { "from pyflink.common.typeinfo import Types\n"
// jobTriggerService.getJobTrigger(job.getTriggerType()).delete(job.getId()); + "from pyflink.datastream import StreamExecutionEnvironment\n"
// } + "from pyflink.table import StreamTableEnvironment\n"
// jobRepository.deleteById(job.getId()); + "from pyflink.common import Row\n"
+ "\n"
+ "env = StreamExecutionEnvironment.get_execution_environment()\n"
+ "t_env = StreamTableEnvironment.create(stream_execution_environment=env)\n"
+ "\n"
);
for (SreworksStreamJobBlockDTO block : blocks){
if(StringUtils.equals(block.getBlockType(), "source")){
/**
* t_env.execute_sql("""
* CREATE TABLE my_source (
* a INT,
* b STRING
* ) WITH (
* 'connector' = 'datagen',
* 'rows-per-second' = '4',
* 'fields.a.min' = '1',
* 'fields.a.max' = '1000'
* )
* """)
* ds = t_env.to_append_stream(
* t_env.from_path('my_source'),
* Types.ROW([Types.INT(), Types.STRING()])
* )
*
*/
scriptContent.append("# source " + block.getName() + "\n");
scriptContent.append("t_env.execute_sql(\"\"\"\n");
scriptContent.append(" CREATE TABLE " + block.getName() +" (\n");
JSONArray columns = block.getData().getJSONArray("columns");
if(columns != null){
for (int i = 0; i < columns.size(); i++) {
JSONObject column = columns.getJSONObject(i);
String columnName = column.getString("columnName");
String columnType = column.getString("columnType");
if (columnType != null && columnName != null){
scriptContent.append(" " + columnName + " " + columnType +"\n");
}
}
}
scriptContent.append(" ) WITH (\n");
JSONArray options = block.getData().getJSONArray("options");
if(options != null){
for (int i = 0; i < options.size(); i++) {
JSONObject option = options.getJSONObject(i);
String key = option.getString("key");
String value = option.getString("value");
if (key != null && value != null){
scriptContent.append(" '" + key + "' = '" + value +"'\n");
}
}
}
scriptContent.append(" )\n");
scriptContent.append("\"\"\")\n");
scriptContent.append("ds = t_env.to_append_stream(\n");
scriptContent.append(" t_env.from_path('" + block.getName() + "'),\n");
scriptContent.append(" Types.ROW([");
if(columns != null){
for (int i = 0; i < columns.size(); i++) {
JSONObject column = columns.getJSONObject(i);
String columnType = column.getString("columnType");
if (columnType != null){
scriptContent.append("Types."+columnType+"(), ");
}
}
}
scriptContent.append("])\n");
scriptContent.append(")\n");
}else if(StringUtils.equals(block.getBlockType(), "python")){
scriptContent.append(block.getData());
}else if(StringUtils.equals(block.getBlockType(), "sink")){
scriptContent.append("sink\n");
}
}
return scriptContent.toString();
}
public SreworksStreamJobSourceDTO addSource(Long streamJobId, String appId, StreamJobSourceCreateParam param) throws Exception {
SreworksStreamJobBlock jobSource = param.init(streamJobId, appId);
jobSource = sreworksStreamJobBlockRepository.saveAndFlush(jobSource);
return new SreworksStreamJobSourceDTO(jobSource);
}
public SreworksStreamJobSinkDTO addSink(Long streamJobId, String appId, StreamJobSinkCreateParam param) throws Exception {
SreworksStreamJobBlock jobSink = param.init(streamJobId, appId);
jobSink = sreworksStreamJobBlockRepository.saveAndFlush(jobSink);
return new SreworksStreamJobSinkDTO(jobSink);
}
public SreworksStreamJobSinkDTO addPython(Long streamJobId, String appId, StreamJobPythonCreateParam param) throws Exception {
SreworksStreamJobBlock jobPython = param.init(streamJobId, appId);
jobPython = sreworksStreamJobBlockRepository.saveAndFlush(jobPython);
return new SreworksStreamJobSinkDTO(jobPython);
}
public List<SreworksStreamJobBlockDTO> listBlockByStreamJobId(Long streamJobId) throws Exception {
List<SreworksStreamJobBlock> sourceList = sreworksStreamJobBlockRepository.findAllByStreamJobId(streamJobId);
List<String> blockTypeOrder = Arrays.asList("source", "python", "sink");
Comparator<SreworksStreamJobBlock> blockTypeComparator = Comparator.comparingInt(block -> blockTypeOrder.indexOf(block.getBlockType()));
sourceList.sort(blockTypeComparator);
return sourceList.stream().map(SreworksStreamJobBlockDTO::new)
.collect(Collectors.toList());
} }
public void modify(Long id, JobModifyParam param) throws Exception { public void modify(Long id, JobModifyParam param) throws Exception {

View File

@ -22,7 +22,7 @@ public class VvpService {
JobApplicationProperties applicationProperties; JobApplicationProperties applicationProperties;
public List<FlinkConnectorDTO> listConnector(Boolean isSource, Boolean isSink, String type) throws Exception { public List<FlinkConnectorDTO> listConnector(Boolean isSource, Boolean isSink, String type) throws Exception {
String connectorUrl = "http://prod-dataops-ververica-platform-ververica-platform.sreworks-dataops/sql/v1beta1/namespaces/default/connectors"; String connectorUrl = String.format("%s/sql/v1beta1/namespaces/default/connectors", applicationProperties.getFlinkVvpEndpoint());
HttpResponse<String> response = Requests.get(connectorUrl, null, null); HttpResponse<String> response = Requests.get(connectorUrl, null, null);
JSONObject connectors = JSONObject.parseObject(response.body()); JSONObject connectors = JSONObject.parseObject(response.body());

View File

@ -43,4 +43,4 @@ spring.elasticsearch.rest.username=${ES_USERNAME}
spring.elasticsearch.rest.password=${ES_PASSWORD} spring.elasticsearch.rest.password=${ES_PASSWORD}
spring.elasticsearch.rest.keep-alive-timeout=600 spring.elasticsearch.rest.keep-alive-timeout=600
# vvp # vvp
flink.vvp.endpoint="http://prod-dataops-ververica-platform-ververica-platform.sreworks-dataops" flink.vvp.endpoint=http://prod-dataops-ververica-platform-ververica-platform.sreworks-dataops