Hive编程开发(3)

Hive Transform

Posted by HC on October 8, 2018

入坑指南(3)

1.Hive Transform

用Java写UDF能保证运行效率,但是开发相对不易。Hive提供另一个快速开发UDF函数的途径:Hive Transform。Hive Transform就像Unix中的管道操作,可以通过有序读取+控制输出的方式实现UDF,UDAF和UDTF的功能。具体来说:

  • 任意读取+ 逐条输出 = UDF
  • 无序读取+ 多条输出= UDTF
  • 有序读取+ 聚合输出= UDAF

一般来说,Hive Transform的写法可以遵循以下:

select transform(col1, col2,...) 
using 'SOME_COMMAND' 
as (new_col1, new_col2,...) 
from table_name

其中,transform是hive内置的语法,是待处理的数据的入口。数据从hive读出,转换成字符串,流入到 SOME_COMMAND的部分,这里我用的SOME_COMMAND表示用于处理字符串的shell命令,比如可以用python来处理数据,那么SOME_COMMAND则是python your_script.py,最后,as后面表示数据处理完毕之后,输出字段的名字。

一种简单实现UDF的方式是:

  1. 我们在from table_name的地方,通过sub-query来控制数据读取的顺序
  2. 在SOME_COMMAND的地方,处理数据,控制输出。

2. Examples

在用脚本解析数据的时候,输入数据是一个以\t分割的字符串,表示hive表中的列。其中hive表中的null会被替换成\N字符串,arr对象会被替换成[1,2,3,..,n]格式的字符串。

2.0 测试数据

key age sex click_list dt
pin2 122 1 1,3,4 2012/12/12
pin3 112 0 1,2,4 2012/12/12
pin1 13 1 1,2,3 2013/12/12
pin5 152 0 1,2,5,4 2018/12/12
pin6 126 1 1,3,3,4 2017/12/12
pin1 14 0 1,4,3,4 2014/12/12
pin8 128 1 1,3,3,4 2012/12/12
pin9 129 1 1,21,3,4 2012/12/12
pin10 120 0 1,2,32,4 2012/11/12
pin8 127 1 1,2,3,42 2011/12/12
pin2 123 1 1,2,3,24 2013/12/12

2.1 UDF

查询语句如下。实现字符串转大写

hive -e "use cf_tmp;
add file /exportfs/home/lixian5/hc/myfile/fileTransfer/UDFTest/src/main/resources/python/to_upper.py;
select transform(pin) using 'python to_upper.py' as upper_pin
from cf_tmp.tmp_data_hc
;"

to_upper.py

import sys

for line in sys.stdin:
    values = line.strip().split("\t")
    print("\t".join(list(map(lambda x: x.upper(), values))))

2.2 UDAF

查询语句如下,实现求和。这里我们用子查询,让同一个用户的数据在一个reduce中,达到聚合的效果

hive -e "use cf_tmp;
add file 你的代码路径/sum.py;
select transform(a.pin, a.age) using 'python sum.py' as (pin, meaningless_age)
from ( select pin, age
from tmp.tmp_data_hc
cluster by pin
) a
;"

sum.py

import sys

def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        return False

last_key = None
sum_ = 0
for line in sys.stdin:
    key, value = line.strip().split("\t")
    if last_key and last_key != key:
        print("\t".join([last_key, str(sum_)]))
        sum_ = 0
    sum_ += float(value) if is_number(value) else 0
    last_key = key

if last_key:
    print("\t".join([last_key, str(sum_)]))

2.3 UDTF

查询语句如下,实现explode。这里click_list是一个arr对象

hive -e "use cf_tmp;
add file 你的代码路径/explode.py;
select transform(click_list) using 'python explode.py' as single_click
from tmp.tmp_data_hc
;"

explode.py

import sys

for line in sys.stdin:
    values = line.strip().split("\t")
    if len(values) == 1:
        list_ = eval(values[0])
        for value in list_:
            print("\t".join([str(value)]))