在hive中动态计算数据的分布

背景

数据的分布是一类很常见的统计需求,尤其在BI应用中。这类需求首先都需要确定分组的组界,一般情况下会由需求方根据经验给出,但在某些需求中要统计的数据是不断新增的,导致组界无法提前确定,只能通过统计动态生成。鉴于分组的组数是确定的,可以考虑通过分位数来计算组界。

通过统计的方式确定组界

首先求数据的分位数,假设组数为5组,那么取4个分位数。

1
2
3
4
SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM sample_data
GROUP BY dimension

计算得到的分位数不能直接作为组界,因为在展示的时候会作为图例展示,需要取整。假设组界要求是5的倍数,则对分位数取最近的5的倍数,并生成组界序号。

1
2
3
4
5
6
7
8
SELECT dimension,
round(boundary/5)*5 AS boundary,
row_number() OVER (PARTITION BY dimension ORDER BY boundary) AS rn
FROM
(SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM sample_data
GROUP BY dimension) t LATERAL VIEW explode(boundaries) boundary_tbl AS boundary

调整后的分位数还存在另一个问题,相邻的点可能是相等的(如果数据分布不均匀),但组距必须大于0。假设组距最小为5,相邻的分位数就需要保证差值至少为5,可以通过窗口函数来计算每个分位数是否需要加这个值。

1
2
3
4
5
6
7
8
9
10
11
12
SELECT dimension,
boundary,
if(boundary-lag(boundary) OVER (PARTITION BY dimension ORDER BY rn)=0,5,0) AS padding
FROM
(SELECT dimension,
round(boundary/5)*5 AS boundary,
row_number() OVER (PARTITION BY dimension ORDER BY boundary) AS rn
FROM
(SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM sample_data
GROUP BY dimension) t LATERAL VIEW explode(boundaries) boundary_tbl AS boundary) t

如果一个分位数对应的padding大于0,那么它之后的分位数都需要加上这个值以保证组距大于0,这是个累加的过程,使用窗口函数来控制计算范围。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SELECT dimension,
boundary + sum(padding) OVER (PARTITION BY dimension ORDER BY rn ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS boundary,
rn
FROM
(SELECT dimension,
boundary,
if(boundary-lag(boundary) OVER (PARTITION BY dimension ORDER BY rn)=0,5,0) AS padding,
rn
FROM
(SELECT dimension,
round(boundary/5)*5 AS boundary,
row_number() OVER (PARTITION BY dimension ORDER BY boundary) AS rn
FROM
(SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM ks_dp_data_dev.sample_data
GROUP BY dimension) t LATERAL VIEW explode(boundaries) boundary_tbl AS boundary) t) tt

这样就得到了组界,做一个自连接转换成想要的格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
WITH boundaries AS
(SELECT dimension,
boundary + sum(padding) OVER (PARTITION BY dimension ORDER BY rn ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS boundary,
rn
FROM
(SELECT dimension,
boundary,
if(boundary-lag(boundary) OVER (PARTITION BY dimension ORDER BY rn)=0,5,0) AS padding,
rn
FROM
(SELECT dimension,
round(boundary/5)*5 AS boundary,
row_number() OVER (PARTITION BY dimension ORDER BY boundary) AS rn
FROM
(SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM sample_data
GROUP BY dimension) t LATERAL VIEW explode(boundaries) boundary_tbl AS boundary) t) tt)
SELECT coalesce(t1.dimension,t2.dimension) AS dimension,
coalesce(t1.rn,t2.rn) AS bin_index,
t1.boundary AS left_boundary,
t2.boundary AS right_boundary
FROM boundaries t1
FULL JOIN
(SELECT dimension,
boundary,
rn-1 AS rn
FROM boundaries) t2 ON t1.dimension=t2.dimension
AND t1.rn=t2.rn

计算分布

Hive自2.2.0版本开始支持不等值连接,可以利用这个特性来实现分布的计算,需要注意的是连接要考虑到首个及最后一个组的边界情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH boundaries AS
(SELECT dimension,
boundary + sum(padding) OVER (PARTITION BY dimension
ORDER BY rn ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS boundary,
rn
FROM
(SELECT dimension,
boundary,
if(boundary-lag(boundary) OVER (PARTITION BY dimension
ORDER BY rn)=0,5,0) AS padding,
rn
FROM
(SELECT dimension,
round(boundary/5)*5 AS boundary,
row_number() OVER (PARTITION BY dimension
ORDER BY boundary) AS rn
FROM
(SELECT dimension,
percentile_approx(metric,array(0.2,0.4,0.6,0.8)) AS boundaries
FROM sample_data
GROUP BY dimension) t LATERAL VIEW explode(boundaries) boundary_tbl AS boundary) t) tt),
bins AS
(SELECT coalesce(t1.dimension,t2.dimension) AS dimension,
coalesce(t1.rn,t2.rn) AS bin_index,
t1.boundary AS left_boundary,
t2.boundary AS right_boundary
FROM boundaries t1
FULL JOIN
(SELECT dimension,
boundary,
rn-1 AS rn
FROM boundaries) t2 ON t1.dimension=t2.dimension
AND t1.rn=t2.rn)
SELECT t1.dimension,
t2.left_boundary,
t2.right_boundary,
count(*) AS frequency
FROM sample_data t1
JOIN bins t2 ON if(t2.bin_index=0,t1.metric<=t2.right_boundary,t1.metric>t2.left_boundary AND t1.metric<=nvl(t2.right_boundary,t1.metric))
AND t1.dimension=t2.dimension
GROUP BY t1.dimension,
t2.left_boundary,
t2.right_boundary

如此就实现了对数据动态的分布统计。