窗口函数
更新时间:2023-12-07
窗口函数概述
BSC 窗口函数支持基于窗口的聚合。例如,需要统计每分钟内,广告被点击的次数,可以通过定义一个1分钟的窗口来收集对应窗口的数据,并对这个窗口内的数据进行聚合计算。BSC 支持两种窗口类型:滚动窗口(tumbling window),滑动窗口(sliding window),会话窗口(session window)。
滚动窗口
滚动窗口(tumbling)将每个元素分配到一个指定大小的窗口中。每个窗口的大小固定,并且互不重合。滚动窗口有一个参数 size,用来指定窗口的大小。例如对于10分钟大小的滚动窗口,数据流会被划分成 [00:00 - 00:10)
、[00:10 - 00:20)
、[00:20 - 00:30)
等窗口。
语法
window(<timeColumn>, <windowDuration>) // timeColumn 参数是数据流中的一个时间属性字段
示例
CREATE TABLE ad_clicks(
ad_name VARCHAR,
ad_url VARCHAR,
click_time TIMESTAMP
) with (
type='KAFKA',
...
);
CREATE TABLE window_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
ad_name VARCHAR,
clicks BIGINT
) with (
type='RDS',
...
);
INSERT INTO
window_output
SELECT
window.start,
window.end,
ad_id,
COUNT(ad_url)
FROM
ad_clicks
GROUP BY
window(click_time, "1 minute"), ad_name
滑动窗口
滑动窗口(Sliding)类似于滚动窗口,不同之处在于,每个滑动窗口之间,是可以重叠的。滑动窗口有两个参数 size 和 slide,用来指定窗口的大小和每次滑动的步长。滑动窗口通常会用来统计计算移动平均数。例如,基于过去30分钟的路况信息,预测未来的拥堵情况,每 5 分钟更新一次。可以设置一个 size = 30 min, slide = 5 min 的滑动窗口。
语法
window(<timeColumn>, <windowDuration>, <slideDuration>) // timeColumn 参数是数据流中的一个时间属性字段
示例
CREATE TABLE traffic(
cross_id VARCHAR,
car_count BIGINT,
ts TIMESTAMP
) with (
type='KAFKA',
...
);
CREATE TABLE window_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
cross_id VARCHAR,
car_avg BIGINT
) with (
type='RDS',
...
);
INSERT INTO
window_output
SELECT
window.start,
window.end,
cross_id,
AVG(ad_url)
FROM
traffic
GROUP BY
window(ts, "30 minutes", "5 minutes"),
cross_id
会话窗口
会话窗口(session window)的特点在于不需要指定固定的窗口大小,一个会话窗口关闭的规则是在一个固定时间内没有再收到新的元素。
会话窗口通过一个间隔时间来配置,这个间隔定义了窗口的最大非活跃时间。例如,一个表示用户网页行为的数据流可能会有长时间的空闲时间把数据分成多个组,每个组内有频繁的行为数据,当数据流的空闲时间超过的设置的间隔时间后,就会视为上一组数据结束,下一组数据开始,也就是开始新的会话窗口。
语法
session_window(<timeColumn>, <windowGap>) // timeColumn 参数是数据流中的一个时间属性字段
示例
CREATE TABLE traffic(
action_id VARCHAR,
action_type VARCHAR,
ts TIMESTAMP
) with (
type='KAFKA',
...
);
CREATE TABLE window_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
action_type VARCHAR,
action_count BIGINT
) with (
type='RDS',
...
);
INSERT INTO
window_output
SELECT
window.start,
window.end,
action_type,
COUNT(*)
FROM
traffic
GROUP BY
session_window(ts, "5 minutes"),
action_type