入坑指南(1)
1. JDBC的客户端调用
用Java来做hive查询的代码样例如下。相关Jar包的配置,见本页最后的pom文件。下面的代码充当了beeline的作用。所以,为了保证能连接上hive,需要确保已经开启了hiveserver2。hiveserver2默认开启的端口是10000。这里我指定的查询语句是select name, buydate, cost, sum(cost) over(partition by name order by cost) from buy
,实现按名字分组,组内按消费排序,并用窗口统计累计消费额。
import java.sql.*;
public class ExecTest {
public static void main(String[] args) throws SQLException {
String host = "localhost";
String port = "10000";
String database = "test";
String userName = "";
String pwd = "";
Statement statement = doConnection(host, port, database, userName, pwd);
String sql = "select name, buydate, cost, sum(cost) over(partition by name order by cost) from buy";
ResultSet resultSet = doQuery(statement, sql);
while(resultSet.next()){
System.out.println(resultSet.getString(1) + "\t" + resultSet.getString(2) +
"\t" + resultSet.getString(3) + "\t" + resultSet.getString(4));
}
}
private static ResultSet doQuery(Statement statement, String sql) throws SQLException {
return statement.executeQuery(sql);
}
private static Statement doConnection(String host, String port, String database, String userName, String pwd) {
Statement statement = null;
String driverName = "org.apache.hive.jdbc.HiveDriver";
try {
Class.forName(driverName);
String url = "jdbc:hive2://" + host+ ":" + port + "/" + database;
Connection connection = DriverManager.getConnection(url, userName, pwd);
statement = connection.createStatement();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return statement;
}
}
同时,我们需要在Hadoop的core-site.xml文件夹中添加一下语句,使用hadoop的用户代理。否则会报错:org.apache.hadoop.ipc.RemoteException:User:hc not allowed to impersonate root
。这里的hc
是我的用户名。
<property>
<name>hadoop.proxyuser.hc.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hc.groups</name>
<value>*</value>
</property>
运行结果:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hc/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hc/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
jack 2015-01-01 10 10.0
jack 2015-02-03 23 33.0
jack 2015-04-06 42 75.0
jack 2015-01-05 46 121.0
jack 2015-01-08 55 176.0
mart 2015-04-08 62 62.0
mart 2015-04-09 68 130.0
mart 2015-04-11 75 205.0
mart 2015-04-13 94 299.0
neil 2015-05-10 12 12.0
neil 2015-06-12 80 92.0
tony 2015-01-02 15 15.0
tony 2015-01-04 29 44.0
tony 2015-01-07 50 94.0
Process finished with exit code 0
2. 自定义UDF函数
UDF函数是User Defined Function的简称。它允许了用户的自定义操作函数插入到hive query中执行。Hive中一共有三种UDF,包括普通的UDF,聚合函数UDAF和表生成函数UDTF。UDF针对一个输入,给出一个特定输出;UDAF是多输入单输出,达到聚合的效果;UDTF反之是单输入多输出。本文针对UDF函数,UDAF和UDTF放到下次博文。注意,这里的输入输出的个数是指数据的行数。
普通的UDF函数根据输入参数的复杂度可以有两种实现。一种继承UDF类,实现对Java基本数据类型和hadoop的writable类型
作为输入。另一种是集成GenericUDF类,以处理复杂输入类型,如map,struct等等。
2.1 继承UDF类
第一种UDF函数的实现很简单。只需要继承UDF类,然后根据个人的操作需求,实现evaluate()方法就可以了。并且这里的evaluate方法允许重载。下面是一个异常简单的例子。它单纯的把字符串替换成123
,把整数替换成1234
。代码里的@UDFType(deterministic = true)
指明给定同一个输入,能得到同一个输出,否则需要把它设置为false。
package MyUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
/**
* one simply in, one out
* Java primitives and hadoop writable
* */
@UDFType(deterministic = true)
public class UDFTest extends UDF{
public String evaluate(String s){
return "123";
}
public int evaluate(int s){
return 1234;
}
}
接下来,对代码打包,加入到hive中,试试效果。
-
打开hive CLI,输入
add jar /home/hc/桌面/Hive-1.0-SNAPSHOT.jar;
,把打好的jar包加入到本次hive session中。 -
定义函数:
create temporary function hudf as 'MyUDF.UDFTest';
。这样就可以使用自定义的udf函数了,hudf是我给它起的别名。(这种方法只能把自定义的udf临时加入hive中。如果要永久加入的话,可以考虑吧jar包放在hive的lib下,然后把create temporary这句话写在~/.hiverc
文件中)。如果需要删除该函数,可以使用:drop temporary function if exists hudf
-
具体使用与结果
hive> select s_num1, hudf(s_num1), s_string1, hudf(s_string1) from alltype; OK 10 1234 hello 123 20 1234 world 123 Time taken: 0.404 seconds, Fetched: 2 row(s)
2.2 继承GenericUDF类
通过继承GenericUDF类,以处理复杂数据类型的数据。相对于继承UDF类,这种更加复杂。它需要实现下面3个方法:
- public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException:用于具体操作前的初始化(比如,检查参数合法性等),指明返回类型。
- public Object evaluate(DeferredObject[] deferredObjects) throws HiveException:具体的自定义操作实现
- public String getDisplayString(String[] strings):对类的一些描述性说明。
也有其他的一些方法可以覆写,比如:
- public void configure(MapredContext context):对mapreduce任务的一些配置,定义一些参数
- public void close() throws IOException:指定map完毕之后的一些操作
- …
下面是一个例子,判断Array字段里是否含有特定的值。代码里的@Description
给出了关于类的描述。
package MyUDF;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@UDFType(deterministic = true)
@Description(name = "GenericUDFTest",
value = "_FUNC_(List<String>, String)",
extended = "check whether an array contains certain spesific element")
public class GenericUDFTest extends GenericUDF{
private static final Logger logger = LoggerFactory.getLogger(GenericUDFTest.class);
private ListObjectInspector listObjectInspector = null;
private StringObjectInspector stringObjectInspector = null;
/**
* Must implement
* */
// check parameters and determinate output type
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if(objectInspectors.length != 2){
throw new UDFArgumentLengthException("it only takes two params");
}
ObjectInspector objectInspector = objectInspectors[0];
ObjectInspector objectInspector1 = objectInspectors[1];
if(!(objectInspector instanceof ListObjectInspector)){
throw new UDFArgumentTypeException(0, "List<String>/Array<String> is required");
}
this.listObjectInspector = (ListObjectInspector) objectInspector;
if(!(this.listObjectInspector.getListElementObjectInspector() instanceof StringObjectInspector)){
throw new UDFArgumentTypeException(0, "List<String>/Array<String> is required");
}
if(!(objectInspector1 instanceof StringObjectInspector)){
throw new UDFArgumentTypeException(1, "String is required");
}
this.stringObjectInspector = (StringObjectInspector) objectInspector1;
return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
}
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
String query = this.stringObjectInspector.getPrimitiveJavaObject(deferredObjects[1].get());
List<LazyString> lists = (List<LazyString>) this.listObjectInspector.getList(deferredObjects[0].get());
if(query == null || lists == null || lists.size() == 0){
return false;
}
for (LazyString ls : lists){
String s = ls.toString();
if(s.equals(query)){
return true;
}
}
return false;
}
public String getDisplayString(String[] strings) {
return "check whether an array contains certain value";
}
/**
* Optional
* */
@Override
public void configure(MapredContext context) {
super.configure(context);
}
@Override
public void close() throws IOException {
super.close();
}
}
同样,对代码打包,加入到hive中,试试效果
-
定义函数:
create temporary function hcontains as 'MyUDF.GenericUDFTest';
-
验证。关于函数的描述信息可以通过
describe function hcontains
或者describe function extended hcontains
进行查看,它会打印出@Description
中的信息。对于函数的使用方法如下:-
输入一个参数:
hive> select s_list, hcontains(s_list) from alltype; FAILED: SemanticException [Error 10015]: Line 1:15 Arguments length mismatch 's_list': it only takes two params
-
输入array和数字:
hive> select s_list, hcontains(s_list,123) from alltype; FAILED: SemanticException [Error 10016]: Line 1:32 Argument type mismatch '123': String is required
-
输入正确格式:
hive> select s_list, hcontains(s_list, "c200") from alltype; OK ["100","c200"] true ["c200","300c"] true Time taken: 0.194 seconds, Fetched: 2 row(s) hive> select s_list, hcontains(s_list, "100") from alltype; OK ["100","c200"] true ["c200","300c"] false Time taken: 0.182 seconds, Fetched: 2 row(s)
-
需要吐槽的是,在evaluate方法里的lists被声明成了List<LazyString>
类型,如果直接转换成List<String>
,然后调用lists.contains(query)来判断的话,会失败。比较结果一直是false。暂时不知啥原因。。。那就debug吧!
3. 用Idea远程调试Hive
-
以debug模式运行hive:
hive --debug
。它会在8000端口进行连接监听 -
新建idea的远程调试服务。
-
Edit Configuration...
$\rightarrow$+
$\rightarrow$Remove
。注意修改端口为8000
-
debug运行
运行之后,可以看见hive进入了 CLI界面
-
-
在CLI里加入jar包,新建temporary function,然后执行查询命令。Idea就会在端点处停下。
为了debug前面说的错误,我这里新建了一个List<String>
对象list_
,并用list_.contains来判断。
List<LazyString> lists = (List<LazyString>) this.listObjectInspector.getList(deferredObjects[0].get());
List<String> lists_ = (List<String>) this.listObjectInspector.getList(deferredObjects[0].get());
从结果看,应该是list_并没有转换成List<String>
对象,反而是List<LazyString>
!所以tmp=false
最后,再给一个适用于任意类型查询的例子,即扩展List<String>
到List<T>
。可以把ObjectInspector
视作变量类型的存储器,而DeferredObject
是变量本身。
package MyUDF;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@UDFType(deterministic = true)
@Description(name = "GenericUDFTestAny",
value = "_FUNC_(List<T>, T)",
extended = "check whether contains")
public class GenericUDFTestAny extends GenericUDF{
private static final Logger logger = LoggerFactory.getLogger(GenericUDFTestAny.class);
private ListObjectInspector listObjectInspector = null;
private ObjectInspector objectInspector = null;
/**
* Must implement
* */
// check parameters and determinate output type
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if(objectInspectors.length != 2){
throw new UDFArgumentLengthException("it only takes two params");
}
ObjectInspector objectInspector = objectInspectors[0];
ObjectInspector objectInspector1 = objectInspectors[1];
if(!(objectInspector instanceof ListObjectInspector)){
throw new UDFArgumentTypeException(0, "List<T>/Array<T> is required");
}
this.listObjectInspector = (ListObjectInspector) objectInspector;
if(!ObjectInspectorUtils.compareTypes(this.listObjectInspector.getListElementObjectInspector(),
objectInspector1)){
throw new UDFArgumentException("two parameters should have the same element type.");
}
if(! ObjectInspectorUtils.compareSupported(objectInspector1)){
throw new UDFArgumentTypeException(1, objectInspector1.getTypeName() +" is not comparable");
}
this.objectInspector = objectInspector1;
return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
}
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
Object list = deferredObjects[0].get();
Object query = deferredObjects[1].get();
if(list == null || this.listObjectInspector.getListLength(list) <= 0){
return false;
}
for (int i = 0; i < this.listObjectInspector.getListLength(list); i++) {
Object listElement = this.listObjectInspector.getListElement(list, i);
if(ObjectInspectorUtils.compare(query, this.objectInspector, listElement,
this.listObjectInspector.getListElementObjectInspector()) == 0){
return true;
}
}
return false;
}
public String getDisplayString(String[] strings) {
return "check whether list contains string";
}
/**
* Optional
* */
@Override
public void configure(MapredContext context) {
super.configure(context);
}
@Override
public void close() throws IOException {
super.close();
}
}
执行的结果如下
4. 附件:Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>DataMining</groupId>
<artifactId>Hive</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hive_version>2.3.2</hive_version>
<hadoop_version>3.0.0</hadoop_version>
<spark_version>2.3.0</spark_version>
<scala_version>2.12.5</scala_version>
</properties>
<dependencies>
<!-- Hive Jar -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive_version}</version>
</dependency>
<!-- Hadoop Jar -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop_version}</version>
</dependency>
</dependencies>
</project>