入坑指南(3)
1.Hive Transform
用Java写UDF能保证运行效率,但是开发相对不易。Hive提供另一个快速开发UDF函数的途径:Hive Transform。Hive Transform就像Unix中的管道操作,可以通过有序读取+控制输出的方式实现UDF,UDAF和UDTF的功能。具体来说:
- 任意读取+ 逐条输出 = UDF
- 无序读取+ 多条输出= UDTF
- 有序读取+ 聚合输出= UDAF
一般来说,Hive Transform的写法可以遵循以下:
select transform(col1, col2,...)
using 'SOME_COMMAND'
as (new_col1, new_col2,...)
from table_name
其中,transform是hive内置的语法,是待处理的数据的入口。数据从hive读出,转换成字符串,流入到 SOME_COMMAND的部分,这里我用的SOME_COMMAND表示用于处理字符串的shell命令,比如可以用python来处理数据,那么SOME_COMMAND则是python your_script.py
,最后,as后面表示数据处理完毕之后,输出字段的名字。
一种简单实现UDF的方式是:
- 我们在
from table_name
的地方,通过sub-query来控制数据读取的顺序 - 在SOME_COMMAND的地方,处理数据,控制输出。
2. Examples
在用脚本解析数据的时候,输入数据是一个以\t
分割的字符串,表示hive表中的列。其中hive表中的null会被替换成\N
字符串,arr对象会被替换成[1,2,3,..,n]
格式的字符串。
2.0 测试数据
key | age | sex | click_list | dt |
---|---|---|---|---|
pin2 | 122 | 1 | 1,3,4 | 2012/12/12 |
pin3 | 112 | 0 | 1,2,4 | 2012/12/12 |
pin1 | 13 | 1 | 1,2,3 | 2013/12/12 |
pin5 | 152 | 0 | 1,2,5,4 | 2018/12/12 |
pin6 | 126 | 1 | 1,3,3,4 | 2017/12/12 |
pin1 | 14 | 0 | 1,4,3,4 | 2014/12/12 |
pin8 | 128 | 1 | 1,3,3,4 | 2012/12/12 |
pin9 | 129 | 1 | 1,21,3,4 | 2012/12/12 |
pin10 | 120 | 0 | 1,2,32,4 | 2012/11/12 |
pin8 | 127 | 1 | 1,2,3,42 | 2011/12/12 |
pin2 | 123 | 1 | 1,2,3,24 | 2013/12/12 |
2.1 UDF
查询语句如下。实现字符串转大写
hive -e "use cf_tmp;
add file /exportfs/home/lixian5/hc/myfile/fileTransfer/UDFTest/src/main/resources/python/to_upper.py;
select transform(pin) using 'python to_upper.py' as upper_pin
from cf_tmp.tmp_data_hc
;"
to_upper.py
import sys
for line in sys.stdin:
values = line.strip().split("\t")
print("\t".join(list(map(lambda x: x.upper(), values))))
2.2 UDAF
查询语句如下,实现求和。这里我们用子查询,让同一个用户的数据在一个reduce中,达到聚合的效果
hive -e "use cf_tmp;
add file 你的代码路径/sum.py;
select transform(a.pin, a.age) using 'python sum.py' as (pin, meaningless_age)
from ( select pin, age
from tmp.tmp_data_hc
cluster by pin
) a
;"
sum.py
import sys
def is_number(s):
try:
float(s)
return True
except ValueError:
return False
last_key = None
sum_ = 0
for line in sys.stdin:
key, value = line.strip().split("\t")
if last_key and last_key != key:
print("\t".join([last_key, str(sum_)]))
sum_ = 0
sum_ += float(value) if is_number(value) else 0
last_key = key
if last_key:
print("\t".join([last_key, str(sum_)]))
2.3 UDTF
查询语句如下,实现explode。这里click_list是一个arr对象
hive -e "use cf_tmp;
add file 你的代码路径/explode.py;
select transform(click_list) using 'python explode.py' as single_click
from tmp.tmp_data_hc
;"
explode.py
import sys
for line in sys.stdin:
values = line.strip().split("\t")
if len(values) == 1:
list_ = eval(values[0])
for value in list_:
print("\t".join([str(value)]))