CHAPTER 9
Hive’s biggest drivers are the broad functionality of HiveQL and its easy adoption for anyone with SQL experience. The complexity of identifying, loading, and transforming data can be isolated in development or ops teams, which will leave analysts free to query huge amounts of data using a familiar syntax.
HiveQL keeps expanding with new releases of Hive, and the language has even been integrated into Apache Spark, so that in-memory Big Data workloads can be based on HiveQL, too.
The language statements we've seen so far have been fundamentally similar to their SQL counterparts, and the same is true for the more advanced HiveQL features. We'll cover those in this chapter, along with some of the functions Hive provides and the mechanism for incorporating custom functionality in Hive.
HiveQL supports inner, outer, cross, and semi joins. However, joins are not as richly supported as in SQL databases, because Hive only supports joins in which the comparison is for equal values—you can't join tables based on columns that have different values (x <> y) in Hive.
The basic joins use standard SQL syntax—Code Listing 93 joins the servers and server_logs tables and returns the first log entry.
> select s.name, s.ipaddresses[0], l.loggedat, l.loglevel from server_logs l join servers s on l.serverid = s.name limit 1; +---------+--------------+-------------+-------------+--+ | s.name | _c1 | l.loggedat | l.loglevel | +---------+--------------+-------------+-------------+--+ | SCSVR1 | 192.168.2.1 | 1439546226 | W | +---------+--------------+-------------+-------------+--+ > select s.name, s.ipaddresses[0], l.loggedat, l.loglevel from server_logs l, servers s where l.serverid = s.name limit 1; +---------+--------------+-------------+-------------+--+ | s.name | _c1 | l.loggedat | l.loglevel | +---------+--------------+-------------+-------------+--+ | SCSVR1 | 192.168.2.1 | 1439546226 | W | +---------+--------------+-------------+-------------+--+ |
Both queries return the same results, as the explicit syntax (join … on) is interchangeable with the implicit syntax (in which tables are named and the join specification is in the where clause).
Similarly, outer joins are specified in the same way as SQL databases and have the same effect on the output—Code Listing 94 returns all the servers that have never recorded any logs.
Code Listing 94: Left Outer Joins
> select s.name, s.site["dc"] from servers s left outer join server_logs l on s.name = l.serverid where l.serverid is null; +---------+---------+--+ | s.name | _c1 | +---------+---------+--+ | SCSVR2 | london | | SCSVR3 | dublin | +---------+---------+--+ |
The cross join statement returns the Cartesian product of the tables, as in SQL, and only left semi join is unusual. Most SQL databases support this join, but they don’t use an explicit clause. This join is equivalent to fetching all the rows in one table from which a matching column value in another table exists.
In SQL databases, that is usually done in a where exists clause, but HiveQL has an explicit join type for it. Code Listing 95 shows how that looks as it returns only those servers which have recorded logs.
Code Listing 95: Left Semi Joins
> select s.name, s.site["dc"] from servers s left semi join server_logs l on s.name = l.serverid; +---------+---------+--+ | s.name | c1 | +---------+---------+--+ | SCSVR1 | london | +---------+---------+--+ |
Provided your join is valid, you can join any database objects no matter what storage engine they use. The examples so far have joined the external table servers with the internal table server_logs. Code Listing 96 joins an internal table (all_devices), an external HDFS table in JSON format (devices), and a view over an external HBase table (device_events_period).
Code Listing 96: Joining Hive and HBase Tables
> select de.period, de.eventname, d.device.deviceclass from device_events_period de join all_devices ad on ad.deviceid = de.deviceid join devices d on ad.deviceclass = d.device.deviceclass where de.period like '201601%'; +------------+---------------+--------------+--+ | de.period | de.eventname | deviceclass | +------------+---------------+--------------+--+ | 20160128 | power.off | tablet | +------------+---------------+--------------+--+ |
As with any SQL database, joining large tables has a performance implication. Hive optimizes the joins wherever it can. With releases since version 0.11, the Hive query engine is becoming increasingly sophisticated at join optimization, and although HiveQL supports query hints for explicit optimization, often they are not needed.
For example, Hive can join onto small tables much more efficiently if the entire table is loaded into memory in a map task. This can be explicitly requested in a query with the mapjoin hint, but Hive can do this automatically if the setting hive.auto.convert.join is true.
Joining results with the union clause will be the last join we'll cover. This lets us combine two result sets with the same column structure. The current version of Hive (1.2.1) supports union all, which includes duplicate rows, and union distinct, which omits duplicates.
Code Listing 97 shows the result of two union clauses and also demonstrates an outer query (the count) with a subquery (the union)—showing that subqueries must be named in HiveQL.
Code Listing 97: Union All and Union Distinct
> select count(1) from (select * from devices a union all select * from devices b) s; +-----+--+ | c0 | +-----+--+ | 4 | +-----+--+ > select count(1) from (select * from devices a union distinct select * from devices b) s; +-----+--+ | c0 | +-----+--+ | 2 | +-----+--+ |
Hive supports basic aggregation to group results by one or more columns, as well as more advanced windowing functions.
Basic aggregation in Hive is done with the group by clause, which defines one or more columns to aggregate by and supports standard SQL aggregation functions such as sum, avg, count, min, and max. You can use several functions for different columns in the same query.
In Code Listing 98 we group syslog entries by the process that generated them, selecting only processes with more than 1,000 entries and showing the process name, number of entries, and size of the largest message logged by the process.
Code Listing 98: Group by Aggregation
> select process, count(process) as entryCount, max(length(message)) largestMessage from syslogs group by process having count(process) > 1000 order by largestMessage desc; ... +-----------------+-------------+-----------------+--+ | process | entrycount | largestmessage | +-----------------+-------------+-----------------+--+ | NetworkManager | 1863 | 201 | | kernel | 7444 | 191 | | thermald | 2224 | 136 | | systemd | 1232 | 93 | +-----------------+-------------+-----------------+--+ |
Tip: You can't include functions like count and max in the order by clause of a query, but if you alias the results of those functions in the select clause, you can order by the aliases. In this example we use order by largestMessage, but if we tried order by max(length(message)), we'd get an error from the Hive compiler.
As in SQL, you cannot include columns in a group by query unless they are in the group clause or are an aggregate function over the grouped set. The SQL:2003 specification defines window functions that let you aggregate over multiple partitions of the data set in a single query.
Note: Hive supports SQL:2003 windowing, but beware of the terminology collision—partitions in window functions have no relation to Hive's table partitioning; they are separate features with the same name.
With analytical functions, you can get row-level data and aggregates in the same query. Code Listing 99 shows a query over syslogs that tells us which processes and process IDs have log entries containing the word 'CRON' (with abbreviated results).
Code Listing 99: Partitioning Queries
> select date_format(loggedat, 'HH:mm'), process, message, count() over(partition by message order by loggedat) from syslogs where upper(message) like '%CRON%'; … +--------+----------+------------------------------------------------------ | c0 | process | message | c3 | +--------+----------+---------------------------------------------------------------------------------------------+-----+--+ | 19:53 | cron | (CRON) INFO (Running @reboot jobs) | 5 | ... | 19:53 | anacron | Job `cron.daily' started | 1 | | 19:53 | anacron | Job `cron.daily' terminated | 2 | | 19:52 | anacron | Job `cron.daily' terminated | 2 | |
Here we get complex results from a straightforward query. The query selects the message text and information about the log entry, partitioning by the message text and counting at that level. The results show the same aggregation as grouping by text and counting, but they include row level details—we can see the 'anacron' processed the same message twice, and we can see the different times it was logged.
Analytic partitions can occur over multiple columns, which is an interesting way to present the data. Adding an order by in the over clause for the above query lets us view the results in time order with a running total of how many of the same messages have been logged up to that point, as shown in Code Listing 100.
Code Listing 100: Partitioned and Ordered Queries
> select date_format(loggedat, 'HH:mm:ss'), process, message, count() over(partition by message order by loggedat) from syslogs where upper(message) like '%CRON%REBOOT%'; ... +-----------+----------+-------------------------------------+-----+--+ | c0 | process | message | c3 | +-----------+----------+-------------------------------------+-----+--+ | 19:52:21 | cron | (CRON) INFO (Running @reboot jobs) | 1 | | 19:52:43 | cron | (CRON) INFO (Running @reboot jobs) | 2 | | 19:53:21 | cron | (CRON) INFO (Running @reboot jobs) | 3 | | 19:53:32 | cron | (CRON) INFO (Running @reboot jobs) | 4 | | 19:54:09 | cron | (CRON) INFO (Running @reboot jobs) | 5 | +-----------+----------+-------------------------------------+-----+--+ |
Partitioning by message and ordering by the timestamp gives us an incremental view of when the message in question was logged. We can take that one stage further using windowing functions.
Windowing functions let you produce a result set in which values for a row are compared to values in the rows that precede or follow the current row. You can set the range explicitly for a window, or you can use functions that implicitly define a window (usually defaulting to single row on either side of the current row).
You can combine scalar values, row-level aggregates, and windowing functions in the same query. Code Listing 101 repeats the previous query, but for each row it specifies the distance in time from this row to the one before it and the one after it.
Code Listing 101: Windowing with Lag and Lead
> select date_format(loggedat, 'HH:mm:ss'), process, message, count() over(partition by message order by loggedat), lag(loggedat) over(partition by message order by loggedat) - loggedat, lead(loggedat) over(partition by message order by loggedat) - loggedat from syslogs where upper(message) like '%CRON%REBOOT%'; ... +-----------+----------+-------------------------------------+-----+------- | c0 | process | message | c3 | c4 | c5 | +-----------+----------+-------------------------------------+-----+------- | 19:52:21 | cron | (CRON) INFO (Running @reboot jobs) | 1 | NULL | 0 00:00:21.455000000 | | 19:52:43 | cron | (CRON) INFO (Running @reboot jobs) | 2 | -0 00:00:21.455000000 | 0 00:00:37.773000000 | | 19:53:21 | cron | (CRON) INFO (Running @reboot jobs) | 3 | -0 00:00:37.773000000 | 0 00:00:10.958000000 | | 19:53:32 | cron | (CRON) INFO (Running @reboot jobs) | 4 | -0 00:00:10.958000000 | 0 00:00:37.695000000 | | 19:54:09 | cron | (CRON) INFO (Running @reboot jobs) | 5 | -0 00:00:37.695000000 | NULL | +-----------+----------+-------------------------------------+-----+------- |
Here the lag function gets the loggedAt value for the previous row (which is NULL for the first row) and lead gets the loggedAt value for the next row. The current row's loggedAt value is subtracted, which gives us the time distance in the result set.
Windowing functions are a relatively simple way to get powerful results—like finding the change of values over time periods, identifying trends, and computing percentiles and ranks.
We've seen many examples of HiveQL's built-in functions already, and they are usually well-named and syntactically clear, so that they stand without much introduction. Hive has a suite of built-in functions which ship with the runtime, which means they are the same no matter which platform you work with.
The Language Manual for Built-in Functions is a comprehensive reference that lists all available functions by the data type they apply to and includes the version from which they were introduced.
Hive includes more than 150 built-in functions; here I'll cover some of the most useful.
ETL and ELT processes almost always include date conversions, and Hive has good support for converting between the high-fidelity TIMESTAMP type and other common string or numeric representations of dates.
In Code Listing 102 we fetch the current UNIX timestamp (in seconds, using the system clock) and convert between string and long-integer date values.
Code Listing 102: Date Conversion Functions
> select unix_timestamp() from_clock, unix_timestamp('2016-02-07 21:00:00') from_string, from_unixtime(1454878996L) from_long; +-------------+--------------+----------------------+--+ | from_clock | from_string | from_long | +-------------+--------------+----------------------+--+ | 1454879183 | 1454878800 | 2016-02-07 21:03:16 | +-------------+--------------+----------------------+--+ |
Once we have a TIMESTAMP, we can extract parts of it, add or subtract other dates, or get the number of days between dates, as in Code Listing 103.
Code Listing 103: Date Manipulation Functions
> select weekofyear(to_date(current_timestamp)) week, date_add('2016-02-07 21:00:00', 10) addition, datediff('2016-02-07', '2016-01-31') difference; +-------+-------------+-------------+--+ | week | addition | difference | +-------+-------------+-------------+--+ | 5 | 2016-02-17 | 7 | +-------+-------------+-------------+--+ |
Hive includes all the usual functions for finding text within string (instr), splitting strings (substr), and joining them (concat). It also includes some useful overloaded functions that allow for common tasks with a single statement, as in Code Listing 104, which manipulates a URL.
Code Listing 104: String Manipulation Functions
> select concat_ws('.', 'blog', 'sixeyed', 'com') as blog, parse_url('https://blog.sixeyed.com', 'PROTOCOL') as protocol; +-------------------+-----------+--+ | blog | protocol | +-------------------+-----------+--+ | blog.sixeyed.com | https | +-------------------+-----------+--+ |
String functions for rich semantic analysis (ngrams and context-ngrams provide textual analysis for word frequency in sentences) and standard language processing functions also exist. Code Listing 105 shows the distance between two words (using the common Levenshtein measure of similarity) and the phonetic representation of a word.
Code Listing 105: Language Processing Functions
> select levenshtein('hive', 'hbase'), soundex('hive'); +------+-------+--+ | _c0 | _c1 | +------+-------+--+ | 3 | H100 | +------+-------+--+ |
As with strings, Hive supports all the usual mathematical functions (round, floor, abs), along with more unusual ones—such as trigonometric functions (sin, cos, tan). Having a wide range of built-in math functions makes complex analysis possible with simple queries.
Code Listing 106 shows some useful mathematical functions.
Code Listing 106: Mathematical Functions
> select pmod(14, 3) modulus, sqrt(91) root, factorial(6) factorial; +----------+--------------------+------------+--+ | modulus | root | factorial | +----------+--------------------+------------+--+ | 2 | 9.539392014169456 | 720 | +----------+--------------------+------------+--+ |
With its support for collection data types, Hive provides functions for working with arrays and maps. Only a small number of functions are provided, but they cover all the functionality you're likely to need.
Code Listing 107 shows how to extract a value from an array and sort the array, then combine them to find the largest value (the array function is used to define the literal array).
Code Listing 107: Collection Functions
> select array(26, 54, 43)[0], sort_array(array(26, 54, 43)), sort_array(array(26, 54, 43))[size(array(26, 54, 43))-1]; +------+-------------+------+--+ | _c0 | _c1 | _c2 | +------+-------------+------+--+ | 26 | [26,43,54] | 54 | +------+-------------+------+--+ |
Similar functions can be used with MAP column types so that you can also extract the set of keys or the set of values as arrays. A key function suitable with either type is explode, which generates a table from a collection, as shown in Code Listing 108.
Code Listing 108: Exploding Collection Data
> select explode(split('Hive Succinctly', ' ')); +-------------+--+ | col | +-------------+--+ | Hive | | Succinctly | +-------------+--+ |
With explode, you can extract multiple rows from a single column value and use that to join against other tables.
Standard SQL functions are available in Hive, such as converting between types (cast), checking for nulls (isnull, isnotnull), and making comparisons (if). Functions that return one value from a range are also standard SQL, as shown in Code Listing 109.
Code Listing 109: Choosing between Values
> select nvl(NULL, 'default') `nvl`, coalesce('first', 'second', NULL) `coalesce`, case true when cast(1 as boolean) then 'expected' else 'unexpected' end `case`; +----------+-----------+-----------+--+ | nvl | coalesce | case | +----------+-----------+-----------+--+ | default | first | expected | +----------+-----------+-----------+--+ |
More unusual functions are also occasionally useful, as shown in Code Listing 110.
Code Listing 110: Miscellaneous Functions
> select pi() `pi`, current_user() `whoami`, hash('@eltonstoneman') `hash`; +--------------------+---------+-------------+--+ | pi | whoami | hash | +--------------------+---------+-------------+--+ | 3.141592653589793 | root | 1326977505 | +--------------------+---------+-------------+--+ |
Hive continues to add functions to the built-in set with AES encryption, SHA, and MD5 hashing, and CRC32 calculations available in Hive 2.0.0.
You can extend Hive yourself with your own User Defined Functions (UDFs). This is a neat way of encapsulating commonly used logic in a language outside of HiveQL. The native language is Java, but Hive supports Hadoop streaming so you can write functions in any language that can be invoked through the operating system command-line.
That means you can write functions in your preferred language and ensure the quality of custom components with unit testing and versioning. Provided you can wrap functions in a command line that reads from standard input and writes to standard output, you can also use existing libraries of code.
A UDF consists of two aspects—making the library available to the Hive runtime and registering the function so you can use it in Hive queries.
Java UDFs are the simplest operation. You write a class that extends org.apache.hadoop.hive.ql.exec.UDF, then build a JAR and copy it to Hive's auxiliary folder (specified with the HIVE_AUX_JARS_PATH setting). Next, you register the class with create temporary function [alias] as [UDF_class_name]. With that, you can call the function using the UDF alias.
Streaming console apps are a little more involved. We'll use Python, a simple app that adds Value Added Tax to an integer amount, as an example in Code Listing 111.
Code Listing 111: A Simple Python Script
#!/usr/bin/python import sys for line in sys.stdin: line = line.strip() print int(line) * 1.2 |
Next we need to copy the .py file to Hive using the add file command in Code Listing 112.
Code Listing 112: Adding the Python Script to Hive
> add file /tmp/add_vat_udf.py; INFO : Added resources: [/tmp/add_vat_udf.py] |
And now we can access the UDF with the transform … using clause, which will invoke the command once for each row in the ResultSet of the Hive query. Code Listing 113 shows the UDF being invoked.
Code Listing 113: Invoking the Python Script in HiveQL
> select transform(input) using 'python add_vat_udf.py' as vat_added from (select explode(array(10, 120, 1400)) as input) a; +------------+--+ | vat_added | +------------+--+ | 12.0 | | 144.0 | | 1680.0 | |
Note: You can't include columns from the result set and transformed column— the final result has to come entirely from the transform. So if I wanted to include the original net value in my query, I couldn't add it to the select statement before the transform, I'd need to write out the input in my Python script and tab-separate it from the output.
The familiarity of SQL, combined with complex analytical functions and the ability to make functions of your own code, make HiveQL a powerful and extensible query language.
For the most part, you can run the exact same queries over any data source—whether that's an internal Hive table, a HDFS folder structure containing thousands of TSV files, or an HBase table with billions of rows.
The Hive compiler generates the most efficient set of map/reduce jobs that it can in order to represent the HiveQL query in a format that can be executed on Hadoop. Presenting a simple interface that can trigger hundreds of compute hours behind the scenes with no further user interaction makes Hive an attractive component in the Big Data stack.
We've covered a lot of ground in this short book, but there's plenty more to learn. In order to address Hive succinctly, I've focused on the functional parts of the language and the runtime, which means I haven't even touched on performance, query optimization, or use of Hive with other clients. Those are the obvious next steps if you want to learn more about Hive.
The hive-succinctly Docker image is a good place to get started. It's set up with Hadoop running in pseudo-distributed mode and configured for YARN, which means it's suitable for testing long-running queries. You can drop your own data onto the container using Docker's copy command, then load it into Hive using the tools we saw in Chapter 6 ETL with Hive.
After that, if you want to try out Hive in production to see how it performs with your data at a much bigger scale, you can easily fire up a Hadoop cluster running Hive in the cloud. The Elastic Map Reduce platform in Amazon Web Services and the HDInsight platform in Microsoft Azure both support Hive, and both will get you up and running in no time.