工作流操作
更新时间:2019-08-07
Workflow即媒资处理工作流,由stage以及stage的依赖关系构成;Workflow名称在单个用户下具有唯一性,即用户创建的工作流名称不能重复。
新建工作流
创建工作流需要构建工作流内的stage和stage的依赖关系。使用如下代码可以新建工作流,代码中给出了创建一个简单工作流和复杂工作流的方法。
private void createWorkflow(BvwClient client) {
client.createWorkflow(createSimpleWorkflowRequest("simple_workflow"));
client.createWorkflow(createComplexWorkflowRequest("complex_workflow"));
}
// 创建一个简单的工作流
private WorkflowCreateRequest createSimpleWorkflowRequest(String workflowName) {
// 创建开始节点(start stage)
StageParamModel startStageParam = StageParamModel.of("{}");
StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
// 创建转码节点(transcoding stage)
StageParamModel transcodingStageParam = StageParamModel.of(
"{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
+ "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
+ "\"adjustOrientation\":\"ALL\"}");
StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
StageType.TRANSCODING);
// 创建发布节点(publish stage)
StageParamModel publishStageParam = StageParamModel.of("{}");
StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
// 设置节点
Map<String, StageModel> stages = Maps.newHashMap();
stages.put(startStage.getName(), startStage);
stages.put(transcodingStage.getName(), transcodingStage);
stages.put(publishStage.getName(), publishStage);
// 设置节点依赖关系
Map<String, List<String>> dependencies = Maps.newHashMap();
dependencies.put(startStage.getName(), Lists.newArrayList(transcodingStage.getName());
dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
dependencies.put(publishStage.getName(), Lists.newArrayList());
// 构建WorkflowCreateRequest
DagModel dag = DagModel.of(stages, dependencies);
return WorkflowCreateRequest.of(workflowName, dag);
}
// 创建一个复杂的工作流
private WorkflowCreateRequest createComplexWorkflowRequest(String workflowName) {
// 创建开始节点(start stage)
StageParamModel startStageParam = StageParamModel.of("{}");
StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
// 创建元信息提取节点(mediainfo stage)
StageParamModel mediaInfoStageParam = StageParamModel.of("{}");
StageModel mediaInfoStage = StageModel.of("mediaInfo", mediaInfoStageParam, StageType.MEDIAINFO);
// 创建黑边检测节点(blackborder stage)
StageParamModel blackBorderDetectStageParam = StageParamModel.of("{}");
StageModel blackBorderDetectStage = StageModel.of("blackBorderDetect", blackBorderDetectStageParam,
StageType.BLACK_BORDER_DETECT);
// 创建转码节点(transcoding stage)
StageParamModel transcodingStageParam = StageParamModel.of(
"{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
+ "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
+ "\"adjustOrientation\":\"ALL\"}");
StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
StageType.TRANSCODING);
// 创建缩略图节点(thumbnail stage)
StageParamModel thumbnailStageParam = StageParamModel.of(
"{\"notificationName\":\"test\",\"job\":{\"pipelineName\":\"test_thumbnail\",\"target\":"
+ "{\"targetBucket\":\"videoworks-source\",\"format\":\"jpg\",\"sizingPolicy\":\"keep\"},"
+ "\"capture\":{\"mode\":\"auto\"}}}");
StageModel thumbnailStage = StageModel.of("thumbnail", thumbnailStageParam, StageType.THUMBNAIL);
// 创建发布节点(publish stage)
StageParamModel publishStageParam = StageParamModel.of("{}");
StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
// 设置节点
Map<String, StageModel> stages = Maps.newHashMap();
stages.put(startStage.getName(), startStage);
stages.put(mediaInfoStage.getName(), mediaInfoStage);
stages.put(blackBorderDetectStage.getName(), blackBorderDetectStage);
stages.put(transcodingStage.getName(), transcodingStage);
stages.put(thumbnailStage.getName(), thumbnailStage);
stages.put(publishStage.getName(), publishStage);
// 设置节点依赖关系
Map<String, List<String>> dependencies = Maps.newHashMap();
dependencies.put(startStage.getName(), Lists.newArrayList(blackBorderDetectStage.getName(),
mediaInfoStage.getName(),
thumbnailStage.getName()));
dependencies.put(blackBorderDetectStage.getName(), Lists.newArrayList(transcodingStage.getName()));
dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
dependencies.put(mediaInfoStage.getName(), Lists.newArrayList(publishStage.getName()));
dependencies.put(thumbnailStage.getName(), Lists.newArrayList(publishStage.getName()));
dependencies.put(publishStage.getName(), Lists.newArrayList());
// 构建WorkflowCreateRequest
DagModel dag = DagModel.of(stages, dependencies);
return WorkflowCreateRequest.of(workflowName, dag);
}
- 创建工作流的stage时,每个stage的参数信息都是Json序列化的字符串,字符串的内容可以参考VideoWorks服务的控制台界面的媒资列表页面——媒资执行实例——任务结果信息的输入信息。如下图是一个转码节点的输入。
- 工作流的stage信息和stage依赖信息中的每个stage名称需要一一对应,否则创建工作流将失败。
删除工作流
使用如下代码可以删除指定名称的工作流。
private void deleteWorkflow(BvwClient client, String workflowName) {
client.deleteWorkflow(workflowName);
}
只有没有被媒资处理使用的工作流可以删除,否则将报错400,"workflow is using, can not modify"。
更新工作流
使用如下代码可以更新工作流的stage信息。
private void updateWorkflow(BvwClient client) {
client.updateWorkflow(updateWorkflowRequest("update_workflow"));
}
// 创建更新的工作流请求
private WorkflowUpdateRequest updateWorkflowRequest(String workflowName) {
// 创建开始节点(start stage)
StageParamModel startStageParam = StageParamModel.of("{}");
StageModel startStage = StageModel.of("start", startStageParam, StageType.START);
// 创建转码节点(transcoding stage)
StageParamModel transcodingStageParam = StageParamModel.of(
"{\"job\":{\"pipelineName\":\"test_transcoding\",\"source\":{},\"target\":{\"presetName\":"
+ "\"bvwtest\",\"targetBucket\":\"videoworks-source\"}},\"needDetectBlackBoard\":true,"
+ "\"adjustOrientation\":\"ALL\"}");
StageModel transcodingStage = StageModel.of("transcoding", transcodingStageParam,
StageType.TRANSCODING);
// 创建发布节点(publish stage)
StageParamModel publishStageParam = StageParamModel.of("{}");
StageModel publishStage = StageModel.of("publish", publishStageParam, StageType.PUBLISH);
// 设置节点
Map<String, StageModel> stages = Maps.newHashMap();
stages.put(startStage.getName(), startStage);
stages.put(transcodingStage.getName(), transcodingStage);
stages.put(publishStage.getName(), publishStage);
// 设置节点依赖关系
Map<String, List<String>> dependencies = Maps.newHashMap();
dependencies.put(startStage.getName(), Lists.newArrayList(transcodingStage.getName()));
dependencies.put(transcodingStage.getName(), Lists.newArrayList(publishStage.getName()));
dependencies.put(publishStage.getName(), Lists.<String>newArrayList());
// 构建WorkflowCreateRequest
DagModel dag = DagModel.of(stages, dependencies);
return WorkflowUpdateRequest.of(workflowName, dag);
}
只有没有被媒资处理使用的工作流才允许更新,否则将报错400,"workflow is using, can not modify"。
查询工作流
使用如下代码可以查询一个工作流。
private void getWorkflow(BvwClient client, String workflowName) {
client.getWorkflow(workflowName);
}
查询工作流列表
使用如下代码可以查询工作流列表。
private void listWorkflow(BvwClient client) {
int pageNo = 1;
int pageSize = 10;
String begin = "2019-06-30T16:00:00Z";
String end = "2019-07-30T16:00:00Z";
String workflowNameFuzzy = "test";
WorkflowListRequest request = WorkflowListRequest.of(pageNo, pageSize, WorkflowStatus.NORMAL,
workflowNameFuzzy, begin, end);
ListByPageResponse<WorkflowListResponse> response = client.listWorkflow(request);
}
查询工作流列表的参数只有pageNo和pageSize是必选项,其他参数为可选项。
启用工作流
使用如下代码可以启用一个工作流。
private void enableWorkflow(BvwClient client, String workflowName) {
client.enableWorkflow(workflowName);
}
禁用工作流
使用如下代码可以禁用一个工作流。
private void disableWorkflow(BvwClient client, String workflowName) {
client.disableWorkflow(workflowName);
}