数据接入SDK使用手册
更新时间:2024-09-12
SDK 介绍
当前提供了java的SDK,可以帮助开发者快速接入第三方知识库系统。
使用步骤
- 引入SDK的jar包和第三方包的依赖
- 按要求增加相关配置
-
根据实际场景选择对应的TaskService实现方式:
- 可选SDK内置的DefaultLocalTaskServiceImpl类,使用内存管理,重启后状态丢失(通过配置项控制)
- 可选SDK内置的DefaultDBTaskServiceImpl类,使用数据库管理任务状态(通过配置项控制)
- 可以扩展自定义ITaskService实现,并注册到ConnectorEngine类中
- 通过扩展AbstractConnector类实现自定义的连接器
SDK使用说明
任务状态
Plain Text
1public static final int STATUS_INIT = 0;
2public static final int STATUS_RUNNING = 1;
3public static final int STATUS_WAIT_NEXT_SCHEDULE = 2;
4public static final int STATUS_SUCCESS = 3;
5public static final int STATUS_FAILED = 4;
6public static final int STATUS_STOPPED = 5;
任务定义
Plain Text
1/**
2 * 知识库id,文档归属的知识库
3 */
4 private String spaceGuid;
5 /**
6 * 甄知租户id
7 */
8 private String accountId;
9 /**
10 * 用户id
11 */
12 private String userId;
13 /**
14 * 用户名
15 */
16 private String userName;
17 /**
18 * 任务名称
19 */
20 private String name;
21 /**
22 * 任务优先级
23 */
24 private int priority;
25 /**
26 * 开始调度时间
27 */
28 private long scheduleStartTime;
29 /**
30 * 结束调度时间
31 */
32 private long scheduleEndTime;
33 /**
34 * 调度间隔
35 */
36 private long scheduleInterval;
37 /**
38 * 任务id
39 */
40 private String taskId;
41 /**
42 * 连接器类型,比如和连接器实现类getType方法中的定义相同
43 */
44 private String connectorType;
45 /**
46 * 任务描述定义对象对应的json表示,Task实现类负责解析
47 */
48 private String sourceDetailJson;
49 /**
50 * 任务状态
51 */
52 private int status;
53 /**
54 * 任务上下文数据对应的json表示,Task实现类负责解析和存储
55 */
56 private String contextData;
57 /**
58 * 访问甄知的token
59 */
60 private String token;
任务描述定义类
定义一个具体的数据源连接器时,可以自定义一个用来指定具体参数的定义类,这个定义类和任务对象中的sourceDetailJson对应。
比如对于一个钉钉文档的数据源,可以定义如下:
Plain Text
1public class DingDingSourceDetail {
2 private String appKey;
3 private String appSecret;
4 private String unionId;
5 private String spacePattern;
6}
连接器实现类
每个数据源都需要实现一个对应的连接器实现类,一般按如下规则实现:
- 从SDK的AbstractConnector类继承;
- 实现getType方法,返回唯一的连接器类型标识,需要和任务表中的connector_type相对应;
-
实现processTask方法,负责处理任务。这个方法主要实现了完整的数据源对接逻辑:
- 连接器类可以注入sdk自带的ExecutorService实例
- 推荐processTask使用ExecutorService实例异步执行任务
-
实现一个任务处理类(由上面的processTask方法负责创建和运行),这个处理类需要继承SDK中的AbstractTask抽象类,并实现Runnable接口:
- 可以使用SDK中缺省的DefaultContext类来存放上下文数据;
- 实现getContext方法,一般直接返回DefaultContext.encode();
- 本次任务执行正常完成后,需要调用onAllDocumentsAdded方法,表示当前任务所有文档已经处理完成;
- 本次任务执行异常,需要调用taskFinished(task.getTaskId(), false);
- 处理完一个文档,需要调用addDocument方法添加到文档处理任务列表中;
文档定义
文档基类
Plain Text
1/**
2 * 文档id
3 */
4private String documentId;
5/**
6 * 文档标题
7 */
8private String title;
9/**
10 * 文档数据流
11 */
12private InputStream contentStream;
13/**
14 * 文档数据
15 */
16private byte[] contentData;
17/**
18 * 文档元数据
19 */
20private Map<String, Object> documentMeta = new HashMap<String, Object>();
21/**
22 * 文档对象
23 */
24private T document;
NormalDocument类
Plain Text
1/**
2 * 文件类型,一般填写文件后缀
3 */
4private String fileType;
5/**
6 * 文件名
7 */
8private String fileName;
9/**
10 * 下载地址
11 */
12private String downloadUrl;
13/**
14 * 预览地址
15 */
16private String previewUrl;
17/**
18 * 图片地址
19 */
20private String imgUrl;
21/**
22 * 更新时间戳
23 */
24private Long updateTime;
25/**
26 * 文件大小
27 */
28private long size;
结构化数据类
Plain Text
1// 对应的数据接入类目id,需要事先在甄知平台定义
2private String categoryId;
权限集成
权限模型介绍
- 通过用户-角色-文档空间-文档来组织RBAC模型
- 在租户内部,支持配置多个应用以实现多应用接入的开发模型(默认情况下,包含一个默认应用)。
- 用户映射到各个租户/应用下的多个角色
权限管理接口
通过ConnectorEngine.getAuthorityManager方法可以获取IAuthorityManager实例,接口定义如下:
Plain Text
1/**
2 * 权限管理
3 */
4public interface IAuthorityManager {
5 /**
6 * 添加角色
7 * @param role 角色
8 * @throws RemoteAccessException
9 */
10 void addRole(String role) throws RemoteAccessException;
11
12 /**
13 * 删除角色
14 * @param role 角色
15 * @throws RemoteAccessException
16 */
17 void removeRole(String role) throws RemoteAccessException;
18
19 /**
20 * 判断角色是否存在
21 * @param role 角色
22 * @return
23 * @throws RemoteAccessException
24 */
25 boolean hasRole(String role) throws RemoteAccessException;
26
27 /**
28 * 查询角色列表
29 * @return 角色列表
30 * @throws RemoteAccessException
31 */
32 List<String> queryRoles() throws RemoteAccessException;
33
34 /**
35 * 添加用户角色
36 * @param userId 用户
37 * @param role 角色
38 * @throws RemoteAccessException
39 */
40 void addUserRoles(String userId, String... role) throws RemoteAccessException;
41
42 /**
43 * 删除用户角色
44 * @param userId 用户
45 * @param role 角色
46 * @throws RemoteAccessException
47 */
48 void removeUserRoles(String userId, String... role) throws RemoteAccessException;
49
50 /**
51 * 查询用户角色
52 * @param userId 用户
53 * @return 角色列表
54 * @throws RemoteAccessException
55 */
56 List<String> queryUserRoles(String userId) throws RemoteAccessException;
57
58 /**
59 * 添加空间角色
60 * @param spaceId 空间
61 * @param roles 角色
62 * @throws RemoteAccessException
63 */
64 void addSpaceRoles(String spaceId, String... roles) throws RemoteAccessException;
65
66 /**
67 * 删除空间角色
68 * @param spaceId 空间
69 * @param roles 角色
70 * @throws RemoteAccessException
71 */
72 void removeSpaceRoles(String spaceId, String... roles) throws RemoteAccessException;
73
74 /**
75 * 查询空间角色
76 * @param spaceId 空间
77 * @return 角色列表
78 * @throws RemoteAccessException
79 */
80 List<String> querySpaceRoles(String spaceId) throws RemoteAccessException;
81
82 /**
83 * 添加空间块角色
84 * @param spaceId 空间
85 * @param roles 角色
86 * @throws RemoteAccessException
87 */
88 void addSpaceBlockRoles(String spaceId, String... roles) throws RemoteAccessException;
89
90 /**
91 * 删除空间块角色
92 * @param spaceId 空间
93 * @param roles 角色
94 * @throws RemoteAccessException
95 */
96 void removeSpaceBlockRoles(String spaceId, String... roles) throws RemoteAccessException;
97
98 /**
99 * 查询空间块角色
100 * @param spaceId 空间
101 * @return 角色列表
102 * @throws RemoteAccessException
103 */
104 List<String> querySpaceBlockRoles(String spaceId) throws RemoteAccessException;
105
106 /**
107 * 注册动态权限服务地址,动态权限服务地址是租户级别的,注册成功后,用户访问平台时,会实时调用该地址,获取用户访问平台时需要的权限
108 * 对应服务需要按照平台的规范要求进行接口实现
109 * 此方法在ConnectorEngine初始化时自动调用(根据connector.dynamicAuth.enabled和connector.dynamicAuth.urlPrefix配置项)
110 *
111 * @param url
112 */
113 void enableDynamicAuthService(String url) throws RemoteAccessException;
114
115 /**
116 * 取消注册动态权限服务地址
117 * 此方法在ConnectorEngine初始化时自动调用(根据connector.dynamicAuth.enabled和connector.dynamicAuth.urlPrefix配置项)
118 * @throws RemoteAccessException
119 */
120 void disableDynamicAuthService() throws RemoteAccessException;
121}
管理文档
通过ConnectorEngine.getDocumentManager方法可以获取IDocumentManager实例,接口定义如下:
Plain Text
1/**
2 * 文档管理接口
3 */
4public interface IDocumentManager {
5 /**
6 * 添加空间
7 *
8 * @param space 空间id
9 * @param parentSpace 上级空间id
10 * @param authorityInherited 是否继承上级权限
11 * @throws RemoteAccessException
12 */
13 void addSpace(String space, String parentSpace,
14 boolean authorityInherited) throws RemoteAccessException;
15
16 /**
17 * 添加空间
18 *
19 * @param space 空间id
20 * @param authorityInherited 是否继承上级权限
21 * @throws RemoteAccessException
22 */
23 void updateSpace(String space,
24 boolean authorityInherited) throws RemoteAccessException;
25
26 /**
27 * 删除空间
28 *
29 * @param space 空间id
30 * @param forceDeleteDocuments 是否强制删除文档,为false时,如果空间下有文档则不能删除
31 * @throws RemoteAccessException
32 */
33 void removeSpace(String space, boolean forceDeleteDocuments) throws RemoteAccessException;
34
35 /**
36 * 查询空间
37 *
38 * @param parentSpace
39 * @return 空间id列表
40 * @throws RemoteAccessException
41 */
42 List<String> querySpaces(String parentSpace) throws RemoteAccessException;
43
44 /**
45 * 查询空间文档
46 *
47 * @param space 空间id
48 * @param offset 分页偏移量
49 * @param limit 每页大小
50 * @return 文档id列表
51 * @throws RemoteAccessException
52 */
53 List<String> querySpaceDocuments(String space,
54 int offset, int limit) throws RemoteAccessException;
55
56 /**
57 * 添加文档到空间
58 *
59 * @param space 空间id
60 * @param documentIds 文档id列表
61 * @throws RemoteAccessException
62 */
63 void addSpaceDocuments(String space, String... documentIds) throws RemoteAccessException;
64
65 /**
66 * 删除文档
67 *
68 * @param space 空间id
69 * @param documentIds 文档id列表
70 * @throws RemoteAccessException
71 */
72 void removeSpaceDocuments(String space, String... documentIds) throws RemoteAccessException;
73
74 /**
75 * 更新文档角色
76 *
77 * @param documentId 文档id
78 * @param roles 角色列表
79 * @throws RemoteAccessException
80 */
81 void updateDocumentRoles(String documentId, String... roles) throws RemoteAccessException;
82
83 /**
84 * 删除文档角色
85 *
86 * @param documentId 文档id
87 * @param roles 角色列表
88 * @throws RemoteAccessException
89 */
90 void removeDocumentRoles(String documentId, String... roles) throws RemoteAccessException;
91
92 /**
93 * 删除文档
94 *
95 * @param documentId 文档id
96 * @throws RemoteAccessException
97 */
98 void deleteDocument(String documentId) throws RemoteAccessException;
99}
任务表
当使用sdk内置的任务表时,需要使用下面的sql建表
Plain Text
1CREATE TABLE `engine_connector_task` (
2 `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '唯一标识',
3 `name` varchar(32) NOT NULL DEFAULT '' COMMENT '任务名',
4 `priority` int(11) DEFAULT '0' COMMENT '优先级',
5 `schedule_start` bigint(20) DEFAULT '0' COMMENT '调度开始时间戳',
6 `schedule_end` bigint(20) DEFAULT '0' COMMENT '调度结束时间戳',
7 `schedule_interval` int(11) DEFAULT '0' COMMENT '调度间隔毫秒数',
8 `task_id` varchar(32) NOT NULL DEFAULT '' COMMENT '任务id',
9 `connector_type` varchar(32) NOT NULL DEFAULT '' COMMENT '任务李诶西',
10 `source_detail_json` varchar(2048) NOT NULL DEFAULT '' COMMENT '任务规则定义json',
11 `status` tinyint(2) DEFAULT NULL COMMENT '状态',
12 `context_data` varchar(5000) DEFAULT NULL COMMENT '上下文数据',
13 `space_guid` varchar(50) DEFAULT NULL COMMENT '对应知识库',
14 `user_id` varchar(50) DEFAULT NULL COMMENT '对应用户id',
15 `user_name` varchar(50) DEFAULT NULL COMMENT '对应用户名',
16 `account_id` varchar(50) DEFAULT NULL COMMENT '对应租户id',
17 `token` varchar(255) DEFAULT NULL COMMENT '访问开放接口token',
18 `task_group_id` varchar(30) DEFAULT NULL COMMENT '任务分组id',
19 `task_group_owner` varchar(255) DEFAULT '' COMMENT '任务分组归属的服务实例',
20 `task_group_lock_time` bigint(20) DEFAULT 0 COMMENT '任务分组分布式锁锁定时间',
21 PRIMARY KEY (`id`),
22 UNIQUE KEY `IDX_TASK_ID` (`task_id`) USING BTREE,
23 KEY `IDX_GROUP` (`task_group_id`) USING BTREE
24) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='任务'
配置说明
Plain Text
1connector.defaultFileUploadToken=文件上传token
2connector.defaultStructDataPushToken=结构化数据上传token
3connector.bos.enabled=是否启用bos传输
4connector.bos.endpoint=bj.bcebos.com
5connector.bos.accessKey=bos的ak
6connector.bos.secretKey=bos的sdk
7connector.bos.bucketName=bos的bocket
8connector.dbTasks.enabled=是否启用默认数据库任务管理器
9connector.taskGroupId=负责处理的任务分组id
10
11spring.datasource.url=数据库连接
12spring.datasource.username=数据库用户名
13spring.datasource.password=数据库密码
14data-importer.server=甄知数据接入地址,SaaS环境为 https://zhenzhi.cloud.baidu.com/
15data-importer.contextPath=甄知数据接入上下文路径,SaaS环境为 /data
16
17connector.disableSSLCheck=是否关闭SSL校验,一般填为false
示例
依赖
Plain Text
1<dependencies>
2 <dependency>
3 <groupId>com.baidu.mpks</groupId>
4 <version>1.0.0-SNAPSHOT</version>
5 <artifactId>data-connnectors-sdk</artifactId>
6 </dependency>
7
8 <dependency>
9 <groupId>org.apache.httpcomponents</groupId>
10 <artifactId>httpclient</artifactId>
11 <version>4.5.13</version>
12 </dependency>
13 <dependency>
14 <groupId>mysql</groupId>
15 <artifactId>mysql-connector-java</artifactId>
16 <version>8.0.28</version>
17 </dependency>
18 <dependency>
19 <groupId>org.springframework.boot</groupId>
20 <artifactId>spring-boot-starter-log4j2</artifactId>
21 <version>2.3.12.RELEASE</version>
22 </dependency>
23 </dependencies>
ConnectorSdkTestApp
Plain Text
1/*
2 * Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
3 */
4package com.baidu.mpks.connectors.engine;
5
6import org.springframework.beans.factory.annotation.Autowired;
7import org.springframework.boot.CommandLineRunner;
8import org.springframework.boot.SpringApplication;
9import org.springframework.boot.autoconfigure.SpringBootApplication;
10import org.springframework.cloud.openfeign.EnableFeignClients;
11import org.springframework.context.annotation.ComponentScan;
12
13@SpringBootApplication
14@EnableFeignClients
15@ComponentScan(basePackages = {"com.baidu.mpks.connectors.engine"})
16public class ConnectorSdkTestApp implements CommandLineRunner {
17 @Autowired
18 private ConnectorEngine connectorEngine;
19
20 public static void main(String[] args) {
21 try {
22 SpringApplication.run(ConnectorSdkTestApp.class, args);
23 } catch (Throwable t) {
24 t.printStackTrace();
25 }
26 }
27
28 @Override
29 public void run(String... args) {
30 connectorEngine.start();
31 }
32}
DiskSourceDetail
Plain Text
1package com.baidu.mpks.connectors.engine;
2
3import lombok.Data;
4
5@Data
6public class DiskSourceDetail {
7 private String path;
8 private String filePattern;
9 private long fileSizeLimit = 1024L * 1024 * 100;
10}
DiskConnector
Plain Text
1package com.baidu.mpks.connectors.engine;
2
3import com.baidu.mpks.connectors.engine.impl.AbstractConnector;
4import com.baidu.mpks.connectors.engine.impl.NormalDocument;
5import lombok.extern.slf4j.Slf4j;
6import org.apache.commons.lang.StringUtils;
7import org.apache.tomcat.util.security.MD5Encoder;
8import org.springframework.beans.factory.annotation.Autowired;
9import org.springframework.stereotype.Service;
10
11import java.io.File;
12import java.io.FileInputStream;
13import java.util.concurrent.ExecutorService;
14
15@Slf4j
16@Service
17public class DiskConnector extends AbstractConnector {
18 @Autowired
19 private ExecutorService newConnectorExecutor;
20 @Autowired
21 private ConnectorConfiguration connectorConfiguration;
22
23 @Override
24 public String getType() {
25 return "disk";
26 }
27
28 @Override
29 protected boolean processTask(TaskDesc task) {
30 newConnectorExecutor.submit(new DiskTask(task));
31 return true;
32 }
33
34 private class DiskTask extends AbstractTask implements Runnable {
35 private DefaultContext context;
36 private DiskSourceDetail sourceDetail;
37
38 public DiskTask(TaskDesc task) {
39 super(DiskConnector.this, task);
40 }
41
42 protected String getContext() {
43 return context.encode();
44 }
45
46 @Override
47 public void run() {
48 try {
49 context = task.getContextData(DefaultContext.class);
50 if (null == context) {
51 context = new DefaultContext();
52 }
53 sourceDetail = task.getSourceDetail(DiskSourceDetail.class);
54 processDiskFiles();
55 onAllDocumentsAdded();
56 } catch (Exception e) {
57 log.error("dingding connector error", e);
58 taskFinished(task.getTaskId(), false);
59 }
60 }
61
62 private void processDiskFiles() throws Exception {
63 long latestTime = 0;
64 if (context.getData().containsKey("latestTime")) {
65 Object latestTimeObj = context.getData().get("latestTime");
66 if (latestTimeObj instanceof Long) {
67 latestTime = (Long) latestTimeObj;
68 } else if (latestTimeObj instanceof Integer) {
69 latestTime = (int) latestTimeObj;
70 } else if (latestTimeObj instanceof String) {
71 latestTime = Long.parseLong((String) latestTimeObj);
72 }
73 }
74
75 File path = new File(sourceDetail.getPath());
76 if (!path.exists() || !path.isDirectory()) {
77 throw new Exception("path not exists or is not a directory");
78 }
79
80 latestTime = processPath(latestTime, path);
81 context.getData().put("latestTime", latestTime);
82 }
83
84 private long processPath(long latestTime, File path) {
85 long newLatestTime = latestTime;
86 File[] files = path.listFiles();
87 for (File file : files) {
88 if (file.isDirectory()) {
89 long retLatestTime = processPath(latestTime, file);
90 if (retLatestTime > newLatestTime) {
91 newLatestTime = retLatestTime;
92 }
93 } else if (file.lastModified() > latestTime) {
94 if (!StringUtils.isEmpty(sourceDetail.getFilePattern())
95 && !file.getName().matches(sourceDetail.getFilePattern())) {
96 continue;
97 }
98 log.info("found new file: {}", file.getAbsolutePath());
99
100 if (file.lastModified() > newLatestTime) {
101 newLatestTime = file.lastModified();
102 }
103 if (file.length() > sourceDetail.getFileSizeLimit()) {
104 continue;
105 }
106
107 try {
108 int index = file.getName().lastIndexOf(".");
109 String ext = null;
110 if (index > 0) {
111 ext = file.getName().substring(index + 1);
112 } else {
113 continue;
114 }
115 NormalDocument normalDocument = new NormalDocument(ext);
116 normalDocument.setDocumentId(MD5Encoder.encode(file.getPath().getBytes()));
117 normalDocument.setTitle(file.getName());
118 normalDocument.setFileName(file.getName());
119 normalDocument.setUpdateTime(file.lastModified());
120 normalDocument.setSize(file.length());
121 normalDocument.setPreviewUrl("http://www.baidu.com");
122 normalDocument.setContentStream(new FileInputStream(file));
123
124 addDocument(normalDocument);
125 } catch (Exception e) {
126 log.error("process file error", e);
127 }
128 }
129 }
130
131 return newLatestTime;
132 }
133 }
134}