对象存储BOS

    选取Object

    基本介绍

    SelectObject接口支持用户对BOS中指定格式(CSV/JSON)的object内容执行SQL语句,通过SQL这种结构化查询语言对object内容进行筛选、分析、过滤之后再返回用户需要的文件内容。

    目前用户想对存储在BOS的object内容进行筛选过滤,只能先通过GetObject接口下载单个object,然后再在本地对数据分析过滤;而SelectObject接口将把筛选过滤工作集成到BOS服务层,从而减少客户从BOS下载数据的网络带宽和延迟,同时也能节省客户筛选数据消耗的CPU和内存等资源,从而降低客户访问BOS中数据所需的应用程序成本。

    适用场景

    SelectObject典型的应用场景是和大数据产品结合使用,替换原来处理BOS数据的GetObject接口,用于提取日志文件指定内容,数据分析筛选等工作。

    使用条件

    如果想使用SelectObject接口筛选BOS中的object,需要满足以下限制条件和细节要求:

    1. 支持的文件类型

      • 仅支持select UTF-8编码的RFC 4180标准CSV(包括TSV等类CSV文件)和Json文件;
      • 支持的CSV文件的最大行及最大列长度都是512K;
      • 支持select的Json文件包括DOCUMENT和LINES两种,DOCUMENT是指整个文件是单一的JSON对象,LINES表示整个文件由多行的JSON对象组成,但整个文件本身并不是一个合法的JSON对象,行与行之间以换行分隔符隔开,支持用户指定常见的\n,\r\n等行列分隔符;
      • 支持select标准、低频、冷存储三种存储类别的文件;
      • 支持select SSE-BOS、SSE-KMS、SSE-C三种服务端加密方式加密之后的文件;
      • 支持select通过GZIP方式压缩后的文件,流式解压选取内容返回,GZIP文件不支持deflate格式,支持标准为RFC1952:Gzip压缩标准参考
    2. 支持的SQL语法

      • 目前只支持SELECT语法,SQL语句满足Select field_list From source Where condition Limit number形式;
      • 支持string、int(64bit)、float(64bit), timestamp,boolean数据类型;
      • 支持逻辑条件(AND/OR/NOT), 算术表达式(+-*/%), 比较运算符(>,=,<,>=,<=,!=),匹配运算符(LIKE,BETWEEN+AND,IN),空判断(IS NULL/IS NOT NULL);
      • 支持聚合函数(AVG、COUNT、MAX、MIN、SUM),转换函数CAST,别名关键词AS;
      • 仅支持单文件查询,不支持join、order by、group by、having、offset等关键词。
    3. SQL语句限制

      • 单个SQL语句最大长度16K,最大的列数是1000,列名称最长为1024,聚合操作(count/avg等)最多100个;
      • LIMIT函数优先级高于聚合函数,例如Select avg(cast(_1 as int)) from BosObject limit 100表示求前100个元素的平均值,与MySQL语义不同;
      • COUNT函数后只能用*,即count(*),不允许count(_1)形式;
      • SQL语句FROM之后的json path指定的JSON节点数据最长为512K,最大深度为10层;
      • [*]数组通配符只能出现在SELECT Json文件时,而且select或者where后边的表达式中不能有[*]数组通配符,数组通配符只能出现在from后的json path中;
      • SELECT Csv文件时,from关键词之后只能是BosObject;
      • WHERE语句里不能包含聚合条件,只允许使用逻辑操作符;
      • LIKE语句中,支持最多5个%通配符,表示0或多个任意字符, _表示单个字符;IN语句中最多支持1024个常量项;
      • Select后的fields可以是列名,CSV列索引(_1, _2等),或者是聚合函数,例如AVG(CAST _1 as int),但是不能是单独的CAST _1 as int;field不支持二元表达式;
      • select后边如果有一个field是*,那就不再允许有其他field,例如select *, _1 from s这种是不合法的;select的field中聚合函数和单独列名不可单独出现;select的field中所有alias别名必须都不一样;
      • 如果json SQL中存在key[*]/key[1]这种形式的field或者source,我们会认为这个field是表示select一个数组元素,键是key;但是如果SQL field/source包括key[a]这种形式,会被解析成键为key[a],去获取json中对应的value;
      • Json文件和SQL中Key的匹配是大小写敏感的,比如select s.Age 和select s.age是不同的。

    Select数据容错机制

    (一)处理缺失数据

    • csv文件的某列数据缺失时,如果该列用于WHERE之后做条件判断,可以直接认为条件不满足,跳过该行数据;但是如果该缺失列被用于SELECT之后做聚合操作时,例如avg(cast _1 as int),我们认为聚合一个不存在的列是非法的,应该直接结束并返回相应错误信息;
    • json文件的某个key缺失时,同上;
    • csv文件的列数据缺失或者json文件的某个key缺失,默认都当做NULL处理,也就是可以通过IS NULL 判断为true。
    • 其它情况:

      1. 当json key或者csv列在WHERE后边表达式中使用的话,例如…… where _1 = '', …… where a.b = 'value';如果缺失的话,我们都默认当做NULL值处理
      2. 当json key或者csv列直接作为field出现在select之后时,例如select _1 from……, select a.b from……;如果缺失的话,csv列应该默认返回空字符串,json key也是返回空字符串

    (二)处理类型不匹配数据

    • csv文件的列数据类型非法,例如CAST _1 as INT,然而_1值为非数值字符串导致cast失败,如果CAST _1 as INT用于WHERE后边做条件判断,可以直接认为条件不满足,跳过该行数据;但是如果该列被用于SELECT之后做聚合操作时,例如avg(cast _1 as int),我们认为聚合一个不合法列是非法的,应该直接结束并返回相应错误信息;
    • json文件的某个key对应数据类型非法时,同上

    csv Object

    select一个csv object一般是通过列序号或者列名来选取指定列数据,或者对某些列做聚合操作,例如一个test.csv文件内容如下,包含了各种数据类型的列,csv默认每一列数据都是字符串,需要做对应的CAST转换操作,列分隔符之间不要留空格:

    header1,header2,header3
    1,2,3.4
    a,b,c
    "d","e","f"
    true,false,true
    2006-01-02 15:04:06,"2006-01-02 16:04:06",2006-01-02 17:04:06

    常用SQL语句

    SQL语句 描述 备注
    select * from BosObject limit 100 返回object前100行数据 -
    select header1,header2 from BosObject 返回object中名称为header1,header2的列 fileHeaderInfo参数需要是"USE"
    select _1,_3 from BosObject where cast(_1 as int) <= cast(_3 as int) 返回object第1列和第3列的整数,满足第1列小于或等于第3列 需要_1,_3表示的列是整型才能被CAST转换,否则会因为不满足条件而跳过
    select count(*) from BosObject 返回object总行数 -
    select AVG(cast(_1 AS int)), MAX(cast(_1 AS int)), MIN(cast(_1 AS int)) from BosObject 返回object中第一列的平均值,最大值,最小值 每一行的第一列都不能包含非整型字符串,否则会直接失败
    select SUM(cast(header1 AS float)) from BosObject WHERE cast(header1 AS float) != 1 返回object中所有列名为header1且值不等于1的和 每一行的header1列都不能包含非数值型字符串
    select * from BosObject where 1 LIKE '%果' 返回object中1列形式满足"%果"的行,例如"苹果树"满足条件,"苹果"不满足条件 LIKE 操作符后字符串使用单引号
    select * from BosObject where cast(_1 AS int) % 3 = 0 返回object中_1列能被3整除的所有行 _1需要是整形字符串,才能使用%操作符
    select * from BosObject where cast(_1 AS int) between 1 and 2 返回object中_1列处于[1,2]区间的所有行 _1需要是整形字符串
    select * from BosObject where cast(_1 AS timestamp) NOT IN (cast('2006-01-02 15:04:06' as timestamp), cast('2006-01-03 15:04:06' as timestamp)) 返回object中_1列不在IN区间的所有行 _1需要是日期字符串形式
    select * from BosObject where cast(_1 AS int) * cast(_2 AS int) > cast(_3 AS float) + 1 返回object中_1列形式满足条件表达式计算结果的所有行 _1,_2,_3需要是满足CAST条件的合法字符串形式

    Json Object

    select一个json object一般是通过key来选取对应的数据,json文件包括LINES和DOCUMENT两种

    JSON DOCUMENT Object

    {"name": "Smith",
    "age": 16,
    "weight": 65.5,
    "org": null,
    "projects":
        [
         {"project_name":"project1", "completed":false},
         {"project_name":"project2", "completed":true}
        ]
    }

    JSON LINES Object

    {"name": "Smith",
    "age": 16,
    "org": null,
    "projects":
        [
         {"project_name":"project1", "completed":false},
         {"project_name":"project2", "completed":true}
        ]
    }
    {"name": "charles",
    "age": 17,
    "org": "baidu",
    "weight": 65.5,
    "projects":
        [
         {"project_name":"project3", "completed":false},
         {"project_name":"project4", "completed":true}
        ]
    }

    常用SQL语句

    • Json path基础形式field0.field1[n].property1.attributes[*]表示查找JSON文件根节点下的field0节点下field1节点中数组的第n个元素,再查找该元素的property1中attributes数组的全部内容
    • JSON object SQL同样可以使用聚合函数,逻辑运算,数学运算等;JSON中value自带数据类型,不需要CAST转换
    SQL语句 描述 备注
    select projects from BosObject where name='Smith' 返回json文件中满足name='Smith'条件的projects元素
    select * from BosObject.projects[*].project_name 返回json文件中根节点下的projects节点数组的project_name字段
    select s.completed from BosObject.projects[1] s where s.project_name='project2' 返回object的projects数组第一个元素的completed字段值,满足project_name = 'project2'
    select * from BosObject s where s.org IS NULL AND weight is null 返回json文件中满足name和weight都为空的记录 weight节点不存在也视为null

    错误返回码

    1. server端返回的错误码可能以http status code的形式返回,也可能在End Message中的error-code返回,以何种形式返回ErrorCode取决于发生具体的错误类型
    ErrorCode 描述 HTTP Status Code
    InvalidFileType select只支持选取csv和json object内容 400
    RecordTooLarge csv文件单行记录长度超出512KB限制 400
    SqlSyntaxError sql语句不合法,存在语法错误 400
    InvalidSqlFields sql语句中SELECT的field不合法,可能存在二元操作符或者其他非法操作 400
    InvalidSqlBinaryExpr 二元操作符使用非法,左右操作数类型不匹配 400
    SqlFieldsNumExceedLimit sql语句SELECT的field数目超出限制 400
    AggregateInvalidField sql语句聚合函数使用不合法,只能聚合数值列 400
    InvalidSqlJsonPathDepth 选取的json object节点深度不合法,超出1024限制或者小于1 400
    SqlSourceNumExceedLimit sql语句中FROM的source数目只能有一个 400
    FieldNotExist sql语句中SELECT的field在文件中不存在 400
    InappropriateJson json object内容格式不正确 400
    HeaderNotExist csv object 中不存在header信息 400
    DecompressError object解压失败 400
    DataOverflowsType 聚合列的结果溢出类型限制 400
    InvalidSqlSource sql语句中FROM的source不合法,检查source格式是否符合要求 400
    InvalidSqlLimitValue sql语句中Limit字段值不合法,需要为正整数 400
    InvalidSqlNotOperator sql语句中NOT操作符使用有误,只能用于BETWEEN/IN/LIKE之前,表示否定 400
    InvalidSqlBetweenOperator sql语句中BETWEEN操作符使用有误,BETWEEN和AND需要同时使用,AND两侧类型要一致 400
    InvalidSqlInOperator sql语句中IN操作符使用有误,IN内部值类型要一致 400
    InvalidSqlIsOperator sql语句中IS操作符使用有误,只能和NULL/NOT NULL连用 400
    InvalidSqlLikeOperator sql语句中LIKE操作符使用有误 400
    InvalidSqlFunction sql语句函数使用有误,检查函数参数类型和数目 400
    InvalidExpressionParameter SelectObject请求中expression参数不合法 400
    InvalidExpressionTypeParameter SelectObject请求中expressionType参数不合法 400
    InvalidCompressionTypeParameter SelectObject请求中compressionType参数不合法 400
    InvalidJsonTypeParameter SelectObject请求中json type参数不合法 400
    InvalidQuoteFieldsParameter SelectObject请求中quote fields参数不合法 400
    InvalidSelectRequestJsonBody SelectObject请求中json body不合法 400

    SDK使用示例

    目前BOS GO SDK支持SelectObject接口,具体使用示例如下:

    package main
    import (
    	"bufio"
    	"encoding/binary"
    	"fmt"
    	"io"
    	"strings"
    )
    import (
    	"github.com/baidubce/bce-sdk-go/services/bos"
    	"github.com/baidubce/bce-sdk-go/services/bos/api"
    )
    func main() {
    	selectBosObject()
    }
    func selectBosObject() {
    	// 初始化BosClient
    	AK, SK := "ak", "sk"
    	ENDPOINT := "bj.bcebos.com"
    	bosClient, _ := bos.NewClient(AK, SK, ENDPOINT)
    	// 先确保bucket,object已经存在,object满足csv/json文件格式要求
    	bucket := "select-bucket"
    	csvObject := "test.csv"
    	fmt.Println("------ select csv object -------")
    	csvArgs := &api.SelectObjectArgs{
    		SelectType: "csv",
    		SelectRequest: &api.SelectObjectRequest{
    			Expression:     "c2VsZWN0ICogZnJvbSBCb3NPYmplY3Qgd2hlcmUgY2FzdChfMSBBUyBpbnQpICogY2FzdChfMiBBUyBpbnQpID4gY2FzdChfMyBBUyBmbG9hdCkgKyAx",
    			ExpressionType: "SQL",
    			InputSerialization: &api.SelectObjectInput{
    				CompressionType: "NONE",
    				CsvParams: map[string]string{
    					"fileHeaderInfo":   "IGNORE",
    					"recordDelimiter":  "Cg==",
    					"fieldDelimiter":   "LA==",
    					"quoteCharacter":   "Ig==",
    					"commentCharacter": "Iw==",
    				},
    			},
    			OutputSerialization: &api.SelectObjectOutput{
    				OutputHeader: false,
    				CsvParams: map[string]string{
    					"quoteFields":     "ALWAYS",
    					"recordDelimiter": "Cg==",
    					"fieldDelimiter":  "LA==",
    					"quoteCharacter":  "Ig==",
    				},
    			},
    			RequestProgress: &api.SelectObjectProgress{
    				Enabled: true,
    			},
    		},
    	}
    	csvRes, err := bosClient.SelectObject(bucket, csvObject, csvArgs)
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    	parseMessages(csvRes)
    	fmt.Println("------ select json object -------")
    	jsonObject := "test.json"
    	jsonArgs := &api.SelectObjectArgs{
    		SelectType: "json",
    		SelectRequest: &api.SelectObjectRequest{
    			Expression:     "c2VsZWN0ICogZnJvbSBCb3NPYmplY3QucHJvamVjdHNbKl0ucHJvamVjdF9uYW1l",
    			ExpressionType: "SQL",
    			InputSerialization: &api.SelectObjectInput{
    				CompressionType: "NONE",
    				JsonParams: map[string]string{
    					"type": "LINES",
    				},
    			},
    			OutputSerialization: &api.SelectObjectOutput{
    				JsonParams: map[string]string{
    					"recordDelimiter": "Cg==",
    				},
    			},
    			RequestProgress: &api.SelectObjectProgress{
    				Enabled: true,
    			},
    		},
    	}
    	jsonRes, err := bosClient.SelectObject(bucket, jsonObject, jsonArgs)
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    	parseMessages(jsonRes)
    }
    // 解析所有headers保存到map中
    func parseHeaders(headers []byte) map[string]string {
    	hm := make(map[string]string)
    	index := 0
    	for index < len(headers) {
    		// headers key length
    		keyLen := int(headers[index])
    		index += 1
    		// headers key
    		key := headers[index : index+keyLen]
    		index += keyLen
    
    		// headers value length
    		valLenByte := headers[index : index+2]
    		valLen := int(binary.BigEndian.Uint16(valLenByte))
    		index += 2
    		// headers value
    		val := headers[index : index+valLen]
    		index += valLen
    		hm[string(key)] = string(val)
    	}
    	return hm
    }
    
    func parseMessages(res *api.SelectObjectResult) {
    	defer res.Body.Close()
    	reader := bufio.NewReader(res.Body)
    	for {
    		// total length in prelude, 4 bytes
    		p := make([]byte, 4)
    		l, err := io.ReadFull(reader, p)
    		if err != nil || l < 4 {
    			fmt.Printf("read total length err: %+v, len: %d\n", err, l)
    			break
    		}
    		totalLen := binary.BigEndian.Uint32(p)
    		// headers length in prelude, 4 bytes
    		l, err = io.ReadFull(reader, p)
    		if err != nil || l < 4 {
    			fmt.Printf("read headers length err: %+v, len: %d\n", err, l)
    			break
    		}
    		headersLen := binary.BigEndian.Uint32(p)
    		// headers part
    		headers := make([]byte, headersLen)
    		l, err = io.ReadFull(reader, headers)
    		if err != nil || uint32(l) < headersLen {
    			fmt.Printf("read headers data err: %+v, len: %d\n", err, l)
    			break
    		}
    		// 获取header长度,并解析headers内容,判断具体的msg类型;end msg则结束读取,
    		// cont msg则调用回调函数输出进度信息,record msg则输出记录信息
    		headersMap := parseHeaders(headers)
    		if headersMap["message-type"] == "Records" {
    			// payload part
    			payloadLen := totalLen - headersLen - 12
    			payload := make([]byte, payloadLen)
    			if _, err := io.ReadFull(reader, payload); err != nil {
    				fmt.Printf("read payload data err: %+v\n", err)
    			}
    			// 设置你使用的OutputSerialization字段中的换行符做分行处理
    			rs := strings.Split(string(payload), "\n")
    			_, err = io.ReadFull(reader, p)
    			crc := binary.BigEndian.Uint32(p)
    			recordsMsg := &api.RecordsMessage{
    				CommonMessage: api.CommonMessage{
    					Prelude: api.Prelude{
    						TotalLen:   totalLen,
    						HeadersLen: headersLen,
    					},
    					Headers: headersMap,
    					Crc32:   crc,
    				},
    				Records: rs,
    			}
    			fmt.Printf("RecordsMessage: %+v\n", recordsMsg)
    			continue
    		}
    		if headersMap["message-type"] == "Cont" {
    			// payload part, progress
    			bs := make([]byte, 8)
    			_, err = io.ReadFull(reader, bs)
    			bytesScanned := binary.BigEndian.Uint64(bs)
    
    			br := make([]byte, 8)
    			_, err = io.ReadFull(reader, br)
    			bytesReturned := binary.BigEndian.Uint64(br)
    
    			_, err = io.ReadFull(reader, p)
    			crc := binary.BigEndian.Uint32(p)
    
    			contMsg := &api.ContinuationMessage{
    				CommonMessage: api.CommonMessage{
    					Prelude: api.Prelude{
    						TotalLen:   totalLen,
    						HeadersLen: headersLen,
    					},
    					Headers: headersMap,
    					Crc32:   crc,
    				},
    				BytesScanned:  bytesScanned,
    				BytesReturned: bytesReturned,
    			}
    			fmt.Printf("ContinuationMessage: %+v\n", contMsg)
    			continue
    		}
    		if headersMap["message-type"] == "End" {
    			_, err = io.ReadFull(reader, p)
    			crc := binary.BigEndian.Uint32(p)
    
    			endMsg := &api.EndMessage{
    				CommonMessage: api.CommonMessage{
    					Prelude: api.Prelude{
    						TotalLen:   totalLen,
    						HeadersLen: headersLen,
    					},
    					Headers: headersMap,
    					Crc32:   crc,
    				},
    			}
    			fmt.Printf("EndMessage: %+v\n", endMsg)
    			break
    		}
    	}
    }
    上一篇
    查看文件列表
    下一篇
    数据发布