Hive编程开发(1)

JDBC的客户端调用和自定义UDF函数实现

Posted by HC on March 30, 2018

入坑指南(1)

1. JDBC的客户端调用

用Java来做hive查询的代码样例如下。相关Jar包的配置,见本页最后的pom文件。下面的代码充当了beeline的作用。所以,为了保证能连接上hive,需要确保已经开启了hiveserver2。hiveserver2默认开启的端口是10000。这里我指定的查询语句是select name, buydate, cost, sum(cost) over(partition by name order by cost) from buy,实现按名字分组,组内按消费排序,并用窗口统计累计消费额。

import java.sql.*;

public class ExecTest {
    public static void main(String[] args) throws SQLException {
        String host = "localhost";
        String port = "10000";
        String database = "test";
        String userName = "";
        String pwd = "";

        Statement statement = doConnection(host, port, database, userName, pwd);
        String sql = "select name, buydate, cost, sum(cost) over(partition by name order by cost) from buy";
        ResultSet resultSet = doQuery(statement, sql);

        while(resultSet.next()){
            System.out.println(resultSet.getString(1) + "\t" + resultSet.getString(2) +
                    "\t" + resultSet.getString(3) + "\t" + resultSet.getString(4));
        }
    }

    private static ResultSet doQuery(Statement statement, String sql) throws SQLException {
        return statement.executeQuery(sql);
    }

    private static Statement doConnection(String host, String port, String database, String userName, String pwd) {
        Statement statement = null;
        String driverName = "org.apache.hive.jdbc.HiveDriver";
        try {
            Class.forName(driverName);
            String url = "jdbc:hive2://" + host+ ":" + port + "/" + database;
            Connection connection = DriverManager.getConnection(url, userName, pwd);
            statement = connection.createStatement();

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return statement;
    }
}

同时,我们需要在Hadoop的core-site.xml文件夹中添加一下语句,使用hadoop的用户代理。否则会报错:org.apache.hadoop.ipc.RemoteException:User:hc not allowed to impersonate root。这里的hc是我的用户名。

<property>
	<name>hadoop.proxyuser.hc.hosts</name>
	<value>*</value>
</property>
<property>
	<name>hadoop.proxyuser.hc.groups</name>
	<value>*</value>
</property>

运行结果:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hc/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hc/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
jack	2015-01-01	10	10.0
jack	2015-02-03	23	33.0
jack	2015-04-06	42	75.0
jack	2015-01-05	46	121.0
jack	2015-01-08	55	176.0
mart	2015-04-08	62	62.0
mart	2015-04-09	68	130.0
mart	2015-04-11	75	205.0
mart	2015-04-13	94	299.0
neil	2015-05-10	12	12.0
neil	2015-06-12	80	92.0
tony	2015-01-02	15	15.0
tony	2015-01-04	29	44.0
tony	2015-01-07	50	94.0

Process finished with exit code 0

2. 自定义UDF函数

UDF函数是User Defined Function的简称。它允许了用户的自定义操作函数插入到hive query中执行。Hive中一共有三种UDF,包括普通的UDF,聚合函数UDAF和表生成函数UDTF。UDF针对一个输入,给出一个特定输出;UDAF是多输入单输出,达到聚合的效果;UDTF反之是单输入多输出。本文针对UDF函数,UDAF和UDTF放到下次博文。注意,这里的输入输出的个数是指数据的行数。

普通的UDF函数根据输入参数的复杂度可以有两种实现。一种继承UDF类,实现对Java基本数据类型和hadoop的writable类型作为输入。另一种是集成GenericUDF类,以处理复杂输入类型,如map,struct等等。

2.1 继承UDF类

第一种UDF函数的实现很简单。只需要继承UDF类,然后根据个人的操作需求,实现evaluate()方法就可以了。并且这里的evaluate方法允许重载。下面是一个异常简单的例子。它单纯的把字符串替换成123,把整数替换成1234。代码里的@UDFType(deterministic = true)指明给定同一个输入,能得到同一个输出,否则需要把它设置为false。

package MyUDF;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

/**
 * one simply in, one out
 * Java primitives and hadoop writable
 * */

@UDFType(deterministic = true)
public class UDFTest extends UDF{
    public String evaluate(String s){
        return "123";
    }

    public int evaluate(int s){
        return 1234;
    }
}

接下来,对代码打包,加入到hive中,试试效果。

  1. 打开hive CLI,输入add jar /home/hc/桌面/Hive-1.0-SNAPSHOT.jar;,把打好的jar包加入到本次hive session中。

  2. 定义函数:create temporary function hudf as 'MyUDF.UDFTest';。这样就可以使用自定义的udf函数了,hudf是我给它起的别名。(这种方法只能把自定义的udf临时加入hive中。如果要永久加入的话,可以考虑吧jar包放在hive的lib下,然后把create temporary这句话写在~/.hiverc文件中)。如果需要删除该函数,可以使用:drop temporary function if exists hudf

  3. 具体使用与结果

    hive> select s_num1, hudf(s_num1), s_string1, hudf(s_string1) from alltype;
    OK
    10	1234	hello	123
    20	1234	world	123
    Time taken: 0.404 seconds, Fetched: 2 row(s)
    

2.2 继承GenericUDF类

通过继承GenericUDF类,以处理复杂数据类型的数据。相对于继承UDF类,这种更加复杂。它需要实现下面3个方法:

  1. public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException:用于具体操作前的初始化(比如,检查参数合法性等),指明返回类型。
  2. public Object evaluate(DeferredObject[] deferredObjects) throws HiveException:具体的自定义操作实现
  3. public String getDisplayString(String[] strings):对类的一些描述性说明。

也有其他的一些方法可以覆写,比如:

  1. public void configure(MapredContext context):对mapreduce任务的一些配置,定义一些参数
  2. public void close() throws IOException:指定map完毕之后的一些操作

下面是一个例子,判断Array字段里是否含有特定的值。代码里的@Description给出了关于类的描述。

package MyUDF;

import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@UDFType(deterministic = true)
@Description(name = "GenericUDFTest",
        value = "_FUNC_(List<String>, String)",
        extended = "check whether an array contains certain spesific element")
public class GenericUDFTest extends GenericUDF{
    private static final Logger logger = LoggerFactory.getLogger(GenericUDFTest.class);
    private ListObjectInspector listObjectInspector = null;
    private StringObjectInspector stringObjectInspector = null;
    /**
     * Must implement
     * */
    // check parameters and determinate output type
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        if(objectInspectors.length != 2){
            throw new UDFArgumentLengthException("it only takes two params");
        }
        ObjectInspector objectInspector = objectInspectors[0];
        ObjectInspector objectInspector1 = objectInspectors[1];

        if(!(objectInspector instanceof ListObjectInspector)){
            throw new UDFArgumentTypeException(0, "List<String>/Array<String> is required");
        }
        this.listObjectInspector = (ListObjectInspector) objectInspector;
        if(!(this.listObjectInspector.getListElementObjectInspector() instanceof StringObjectInspector)){
            throw new UDFArgumentTypeException(0, "List<String>/Array<String> is required");
        }

        if(!(objectInspector1 instanceof StringObjectInspector)){
            throw new UDFArgumentTypeException(1, "String is required");
        }
        this.stringObjectInspector = (StringObjectInspector) objectInspector1;

        return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
    }

    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        String query = this.stringObjectInspector.getPrimitiveJavaObject(deferredObjects[1].get());
        List<LazyString> lists = (List<LazyString>) this.listObjectInspector.getList(deferredObjects[0].get());
        if(query == null || lists == null || lists.size() == 0){
            return false;
        }
        for (LazyString ls : lists){
            String s = ls.toString();
            if(s.equals(query)){
                return true;
            }
        }
        return false;
    }

    public String getDisplayString(String[] strings) {
        return "check whether an array contains certain value";
    }

    /**
     * Optional
     * */
    @Override
    public void configure(MapredContext context) {
        super.configure(context);
    }

    @Override
    public void close() throws IOException {
        super.close();
    }
}

同样,对代码打包,加入到hive中,试试效果

  1. 定义函数:create temporary function hcontains as 'MyUDF.GenericUDFTest';

  2. 验证。关于函数的描述信息可以通过describe function hcontains或者describe function extended hcontains进行查看,它会打印出@Description中的信息。对于函数的使用方法如下:

    1. 输入一个参数:

      hive> select s_list, hcontains(s_list) from alltype;
      FAILED: SemanticException [Error 10015]: Line 1:15 Arguments length mismatch 's_list': it only takes two params
      
    2. 输入array和数字:

      hive> select s_list, hcontains(s_list,123) from alltype;
      FAILED: SemanticException [Error 10016]: Line 1:32 Argument type mismatch '123': String is required
      
    3. 输入正确格式:

      hive> select s_list, hcontains(s_list, "c200") from alltype;
      OK
      ["100","c200"]	true
      ["c200","300c"]	true
      Time taken: 0.194 seconds, Fetched: 2 row(s)
      hive> select s_list, hcontains(s_list, "100") from alltype;
      OK
      ["100","c200"]	true
      ["c200","300c"]	false
      Time taken: 0.182 seconds, Fetched: 2 row(s)
      

需要吐槽的是,在evaluate方法里的lists被声明成了List<LazyString>类型,如果直接转换成List<String>,然后调用lists.contains(query)来判断的话,会失败。比较结果一直是false。暂时不知啥原因。。。那就debug吧!

3. 用Idea远程调试Hive

  1. 以debug模式运行hive:hive --debug。它会在8000端口进行连接监听

  2. 新建idea的远程调试服务。

    1. Edit Configuration... $\rightarrow$ + $\rightarrow$ Remove。注意修改端口为8000

    2. debug运行

      运行之后,可以看见hive进入了 CLI界面

  3. 在CLI里加入jar包,新建temporary function,然后执行查询命令。Idea就会在端点处停下。

为了debug前面说的错误,我这里新建了一个List<String>对象list_,并用list_.contains来判断。

List<LazyString> lists = (List<LazyString>) this.listObjectInspector.getList(deferredObjects[0].get());
List<String> lists_ = (List<String>) this.listObjectInspector.getList(deferredObjects[0].get());

从结果看,应该是list_并没有转换成List<String>对象,反而是List<LazyString>!所以tmp=false

最后,再给一个适用于任意类型查询的例子,即扩展List<String>List<T>。可以把ObjectInspector视作变量类型的存储器,而DeferredObject是变量本身。

package MyUDF;

import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@UDFType(deterministic = true)
@Description(name = "GenericUDFTestAny",
        value = "_FUNC_(List<T>, T)",
        extended = "check whether contains")
public class GenericUDFTestAny extends GenericUDF{
    private static final Logger logger = LoggerFactory.getLogger(GenericUDFTestAny.class);
    private ListObjectInspector listObjectInspector = null;
    private ObjectInspector objectInspector = null;
    /**
     * Must implement
     * */
    // check parameters and determinate output type
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        if(objectInspectors.length != 2){
            throw new UDFArgumentLengthException("it only takes two params");
        }
        ObjectInspector objectInspector = objectInspectors[0];
        ObjectInspector objectInspector1 = objectInspectors[1];

        if(!(objectInspector instanceof ListObjectInspector)){
            throw new UDFArgumentTypeException(0, "List<T>/Array<T> is required");
        }
        this.listObjectInspector = (ListObjectInspector) objectInspector;


        if(!ObjectInspectorUtils.compareTypes(this.listObjectInspector.getListElementObjectInspector(),
                objectInspector1)){
            throw new UDFArgumentException("two parameters should have the same element type.");
        }

        if(! ObjectInspectorUtils.compareSupported(objectInspector1)){
            throw new UDFArgumentTypeException(1, objectInspector1.getTypeName() +" is not comparable");
        }

        this.objectInspector = objectInspector1;

        return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
    }

    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        Object list = deferredObjects[0].get();
        Object query = deferredObjects[1].get();

        if(list == null || this.listObjectInspector.getListLength(list) <= 0){
            return false;
        }

        for (int i = 0; i < this.listObjectInspector.getListLength(list); i++) {
            Object listElement = this.listObjectInspector.getListElement(list, i);
            if(ObjectInspectorUtils.compare(query, this.objectInspector, listElement,
                    this.listObjectInspector.getListElementObjectInspector()) == 0){
                return true;
            }
        }
        return false;
    }

    public String getDisplayString(String[] strings) {
        return "check whether list contains string";
    }

    /**
     * Optional
     * */
    @Override
    public void configure(MapredContext context) {
        super.configure(context);
    }

    @Override
    public void close() throws IOException {
        super.close();
    }
}

执行的结果如下

4. 附件:Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>DataMining</groupId>
    <artifactId>Hive</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <hive_version>2.3.2</hive_version>
        <hadoop_version>3.0.0</hadoop_version>
        <spark_version>2.3.0</spark_version>
        <scala_version>2.12.5</scala_version>
    </properties>
    <dependencies>
        <!-- Hive Jar -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive_version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-service</artifactId>
            <version>${hive_version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive_version}</version>
        </dependency>

        <!-- Hadoop Jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop_version}</version>
        </dependency>
    </dependencies>
</project>