CHAPTER 6
When you have existing data you want to load into Hive without creating a link to the source data by using an external table, Hive offers many options. In fact, Hive has several commands to support ETL, all of which result in populating an internal Hive table.
You can populate Hive from a subset of data from HBase and split the row key into parts for indexing, or you can load files from either HDFS or the local file system. As part of the load, you can transform the data into more usable representations, or you can strip out only the parts you need.
Hive offers multiple options for ETL, but all of the processing is done through map/reduce jobs, which means loading data of any size is possible. Hive is particularly well suited for the alternative data input approach ETL because it supports very efficient loading of data in the native format, which can then be transformed by reading and writing Hive tables.
In this chapter we’ll cover the major commands for getting data into Hive in the format of your choosing.
The simplest ETL tool is the load command. Simply put, it loads a file or set of files into an existing internal Hive table. Running load is suitable only when the data in the source files matches the target table schema, because you cannot include any transformations in this option.
You should also know that the load statement will not do any verification, which means you can load files with the wrong format into a table and the command will run, but you won’t be able to read the data back again.
With load you can specify the source file(s) using either a local filepath or an HDFS path. Code Listing 47 shows an example that loads a syslog file from the local /tmp directory into the syslogs_flat table and then fetches the first row.
Code Listing 47: Loading Data into Hive
> load data local inpath '/tmp/sample-data/syslogs/syslog' into table syslogs_flat; … INFO : Loading data to table default.syslogs_flat from file:/tmp/syslogs/syslog INFO : Table default.syslogs_flat stats: [numFiles=1, totalSize=462587] No rows affected (0.153 seconds) > select * from syslogs_flat limit 1; +------------------------------------------------------------------------+- | syslogs_flat.entry | +------------------------------------------------------------------------+- | Jan 28 20:35:00 sc-ub-xps thermald[785]: Dropped below poll threshold | +------------------------------------------------------------------------+--+ |
The load command has two qualifiers for changing its behavior:
Code Listing 48 shows an alternative use of the load command—appending the existing data in the syslogs_flat table from a file in HDFS with counts before and after the load in order to show the number of rows in the table.
Code Listing 48: Appending Data to Hive
> select count(*) from syslogs_flat; +-------+--+ | _c0 | +-------+--+ | 3942 | +-------+--+ 1 row selected (16.474 seconds) > load data inpath 'hdfs://localhost:9000/tmp/syslog.1' into table syslogs_flat; INFO : Loading data to table default.syslogs_flat from hdfs://localhost:9000/tmp/syslog.1 INFO : Table default.syslogs_flat stats: [numFiles=2, totalSize=1753418] No rows affected (0.195 seconds) > select count(*) from syslogs_flat; +--------+--+ | _c0 | +--------+--+ | 15642 | +--------+--+ |
Note: the source file path is specified as a full URI here in order to show that a remote HDFS cluster can be used, but you can also specify relative or absolute paths for files in the home HDFS cluster used by Hive.
The load command can only be used with internal tables because its single function is to copy files from the specified source to the underlying folder in HDFS for the Hive table. Code Listing 49 shows the contents of the table folder after the two preceding load operations.
Code Listing 49: Listing Files Loaded into Hive
root@hive:/hive-setup# hdfs dfs -ls /user/hive/warehouse/syslogs_flat Found 2 items -rwxrwxr-x 1 root supergroup 462587 2016-02-02 07:28 /user/hive/warehouse/syslogs_flat/syslog -rwxrwxr-x 1 root supergroup 1290831 2016-02-02 07:32 /user/hive/warehouse/syslogs_flat/syslog.1 |
With the overwrite flag, Hive deletes the existing files before copying the new source files. Code Listing 50 shows the result of overwriting the table with a new file and also shows the HDFS file listing after the load.
Code Listing 50: Load with Overwrite
> load data inpath 'hdfs://localhost:9000/tmp/syslog.2.gz' overwrite into table syslogs_flat; INFO : Loading data to table default.syslogs_flat from hdfs://localhost:9000/tmp/syslog.2.gz INFO : Table default.syslogs_flat stats: [numFiles=1, numRows=0, totalSize=253984, rawDataSize=0] No rows affected (0.154 seconds) > select count(*) from syslogs_flat; +--------+--+ | _c0 | +--------+--+ | 15695 | … root@hive:/hive-setup# hdfs dfs -ls /user/hive/warehouse/syslogs_flat Found 1 items -rwxrwxr-x 1 root supergroup 253984 2016-02-02 07:44 /user/hive/warehouse/syslogs_flat/syslog.2.gz |
The load statement is a powerful and quick way of loading data into Hive because it simply does an HDFS put to copy the files from the source into the Hive table structure. However, its use is limited to cases in which the source files are in the correct format for the table and no transformation can occur as part of the load.
Note: Hive also has the import and export functions that let you save or load tables to an HDFS location. These are are different from load, in that the Hive metadata gets exported and imported along with the data, so that you can use those functions effectively to back up a table on one Hive instance and recreate it on another.
We can see the limitations of having untransformed data in the syslogs_flat table. For example, syslog files from a Linux machine can be easily loaded, but the format is not easily mapped in Hive. Each entry uses space-separated fields for date, machine name, and event type, along with a colon before the message. Data can’t be transformed as part of a load, so I have a Hive table with a single string column in which each row contains all the log entry data in one string value.
The rows in the syslogs_flat table aren't well suited for querying if we use load, but now that the data is in Hive we can use other options to transform the data into a more usable format and load it into other tables.
This is more of an extract, load, transform (ELT) approach in which data is first loaded into Hive in its native format, which is fast, and then transformed. Transformation can be scheduled as map/reduce jobs by Hive.
Hive supports loading a table with the results of a query on other objects, and you can include transformation functions in the query. You can also populate internal or external tables this way, with the query parser doing some basic validation in order to ensure that the columns from the source query will fit into the target table.
Columns are positionally matched between source and target, which means you need to craft your select statement so that the columns in the result set are in the same order as the columns defined in the target table.
If there are too many or too few columns in the query, Hive won't try to match them to the target table and you'll get an error, as in Code Listing 51.
Code Listing 51: Invalid Insert Statement
> insert into table server_logs select 's1', 123L, 'E' as loglevel from dual; Error: Error while compiling statement: FAILED: SemanticException [Error 10044]: Line 1:18 Cannot insert into target table because column number/types are different 'server_logs': Table insclause-0 has 4 columns, but query has 3 columns. (state=42000,code=10044) |
Tip: You must include a from clause in an insert…select statement, which means you can't use literal expressions such as select 1, 'a'. But you can create a table with one row and one column (I name it dual after the convention in Oracle databases), and then you can use dual in the from clause. You will find that insert into [table] select 'a', 'b', 3 will fail, but insert into [table] select 'a', 'b', 3 from dual will succeed.
Hive doesn't verify the data types of the column, which means you can load data into the wrong columns without any errors. The type mismatch will manifest itself when you attempt to query the data and Hive can't map the values, so that the results contain nulls.
The query clause can contain any valid HiveQL, including joins and unions between internal and external tables, function calls, and aggregation, which allows for a lot of extract and transform logic. In order to make a set of syslog files more usable, I've defined a new table with separate columns for the data items, as in Code Listing 52.
Code Listing 52: A Structured Table for Syslogs
> describe syslogs; +-----------+------------+----------+--+ | col_name | data_type | comment | +-----------+------------+----------+--+ | loggedat | timestamp | | | host | string | | | process | string | | | pid | int | | | message | string | | +-----------+------------+----------+--+ |
We'll see more HiveQL functions in Chapter 9 Querying with HiveQL, and there is a useful string function called sentences that takes an input string and returns it tokenized as an array of sentences, each containing an array of words. I can use that to pull out specific words from the log entry, as in Code Listing 53, in which I also cast strings to other types.
Code Listing 53: Splitting String Fields
> select sentences(entry)[0][5] as host, sentences(entry)[0][6] as process, cast(sentences(entry)[0][7] as int) as pid from syslogs_flat limit 5; +------------+------------------------------------+-------+--+ | host | process | pid | +------------+------------------------------------+-------+--+ | sc-ub-xps | anacron | 804 | | sc-ub-xps | anacron | 804 | | sc-ub-xps | org.gnome.zeitgeist.SimpleIndexer | 1395 | | sc-ub-xps | systemd-timesyncd | 625 | | sc-ub-xps | systemd | 1293 | +------------+------------------------------------+-------+--+ |
The timestamp of the log entry is a little trickier, especially because Ubuntu doesn't record the year in the log, but if we assume the current year we can use a combination of string and date functions to prepend the year to the date and time in the entry, then convert it all to a timestamp, as in Code Listing 53.
Code Listing 54: Transforming Strings to Timestamps
> select unix_timestamp(concat(cast(year(current_date) as string), ' ', substr(entry, 0, 15)), 'yyyy MMM dd hh:mm:ss') from syslogs_flat limit 1; +-------------+--+ | _c0 | +-------------+--+ | 1453755712 | +-------------+--+ |
The final column is the log message, which is a straightforward substring after the closing square bracket from the process ID, as in Code Listing 55.
Code Listing 55: Extracting Substrings
> select trim(substr(entry, instr(entry, ']')+2)) from syslogs_flat limit 1; +------------------------------+--+ | _c0 | +------------------------------+--+ | Job 'cron.daily' terminated | +------------------------------+--+ |
Putting it all together, we can populate the new syslogs table from the raw syslogs_flat table with a single insert … select. The query aspect is clunky because of the nature of the input data, but once it runs we can make much more useful selections over the formatted syslogs table, as in Code Listing 56.
Code Listing 56: Transforming Raw Data
> insert into syslogs select unix_timestamp(concat(cast(year(current_date) as string), ' ', substr(entry, 0, 15)), 'yyyy MMM dd hh:mm:ss'), sentences(entry)[0][5], sentences(entry)[0][6], cast(sentences(entry)[0][7] as int), trim(substr(entry, instr(entry, ']')+2)) from syslogs_flat; ... No rows affected (16.757 seconds) > select process, count(process) as entries from syslogs where host = 'sc-ub-xps' group by process order by entries desc limit 5; … +-----------------+----------+--+ | process | entries | +-----------------+----------+--+ | kernel | 7862 | | thermald | 2906 | | systemd | 1450 | | NetworkManager | 1245 | | avahi-daemon | 310 | +-----------------+----------+--+ |
Inserting query results to tables also supports the overwrite clause, which effectively truncates the target table before inserting the results of the query.
Hive supports an extended version of insert … select in which multiple insert statements from multiple queries over the same source can be chained. In this variation, you would begin by specifying the source table (or view), then adding the inserts.
Multiple inserts are useful when you need to populate different Hive projections from a single source, because multiple inserts run very efficiently. Hive will scan the source data once, then run each query over the scanned data, inserting it into the target.
In Code Listing 57 we use the unformatted syslogs table as the source and load the formatted table and a summary table at the same time.
Code Listing 57: Multiple Inserts
from syslogs_flat sf insert overwrite table syslogs select unix_timestamp(concat(cast(year(current_date) as string), ' ', substr(sf.entry, 0, 15)), 'yyyy MMM dd hh:mm:ss'), sentences(sf.entry)[0][5], sentences(sf.entry)[0][6], cast(sentences(sf.entry)[0][7] as int), trim(substr(sf.entry, instr(sf.entry, ']')+2)) insert overwrite table syslog_summaries select unix_timestamp(), sentences(sf.entry)[0][5] as host, count(sentences(sf.entry)[0][5]) as entries group by sentences(sf.entry)[0][5] … > select * from syslog_summaries limit 1; +-------------------------------+------------------------+----------------- | syslog_summaries.processedat | syslog_summaries.host | syslog_summaries.entries | +-------------------------------+------------------------+----------------- | 2016-02-02 20:00:48.062 | sc-ub-xps | 15695 | +-------------------------------+------------------------+----------------- |
A final variation on inserting data from query results is create table as select (CTAS), which lets us define a table and populate it with a single statement. The table can only be internal, but we can specify the normal stored by clauses for internal tables.
In a CTAS table, the structure is inferred from the query, which means we don't need to explicitly specify the columns. Queries should cast values into the data types we want for the target table and should specify aliases that will be used as the column names.
The CTAS operation is atomic, which means the table will not appear as available for querying until the CTAS completes and the table is populated.
Code Listing 58 shows a CTAS statement for loading the individual words from syslog entries into a new table. The select statement parses the log message as sentences and extracts the first sentence as an array of strings, which is how Hive defines the column.
Code Listing 58: Create Table as Select
> create table syslog_sentences stored as orc as select sentences(trim(substr(entry, instr(entry, ']')+2)))[0] words from syslogs_flat; ... No rows affected (12.758 seconds) > describe syslog_sentences; +-----------+----------------+----------+--+ | col_name | data_type | comment | +-----------+----------------+----------+--+ | words | array<string> | | +-----------+----------------+----------+--+ 1 row selected (0.064 seconds) > select * from syslog_sentences limit 2; +------------------------------------+--+ | syslog_sentences.words | +------------------------------------+--+ | ["Job","cron.daily","terminated"] | | ["Normal","exit","1","job","run"] | +------------------------------------+--+ |
Hive supports temporary tables, which are very useful for interim data transformations in ETL/ELT workloads that cannot achieve the full transform in a single step. Temporary tables don't support indexes, and they exist only for the life of the Hive session—they are automatically deleted when the session ends.
Temporary tables are internal tables stored for the user in the working directory within HDFS. They can be specified with a supported file format, so that you benefit from efficient storage while you use the table. However, note that not all functionality is supported.
Code Listing 59 shows a temporary table being created to store the progress of an ETL job.
Code Listing 59: Using Temporary Tables
> create temporary table etl_progress(status string, stage string, processedat timestamp, rowcount bigint) stored as orc; No rows affected (0.079 seconds) > insert into etl_progress(status, stage, processedat, rowcount) values('Done', 'Transform.1', '2016-02-02 07:03:01', 328648); No rows affected (14.853 seconds) > select * from etl_progress; +----------------------+---------------------+---------------------------+- | etl_progress.status | etl_progress.stage | etl_progress.processedat | etl_progress.rowcount | +----------------------+---------------------+---------------------------+- | Done | Transform.1 | 2016-02-02 07:03:01.0 | 328648 | +----------------------+---------------------+---------------------------+- |
In this case the insert statement uses a literal date because Hive doesn't support using functions in the values clause for inserting into a temporary table. If you try using a built-in function, such as unix_timestamp, in order to get the current time, you'll receive an error.
However, you can use functions in a select clause, which means you can use the same trick of selecting literals from dual, as shown in Code Listing 60.
Code Listing 60: Inserting from Functions
> insert into etl_progress(status, stage, processedat, rowcount) values('Done', 'Transform.1', unix_timestamp(), 328648); Error: Error while compiling statement: FAILED: SemanticException [Error 10293]: Unable to create temp file for insert values Expression of type TOK_FUNCTION not supported in insert/values (state=42000,code=10293) > insert into etl_progress select 'Done', 'Transform.2', unix_timestamp(), 12435358 from dual; ... No rows affected (12.437 seconds) |
Whether the session is interactive with Beeline or a job submitted through an external interface, Hive deletes the table at the end of the session. The temporary table is visible only in the session that created it. Other sessions, even for the same user, will not see the table.
When the session that created the temporary table ends, the data and metadata for the table are removed, and when the next session starts the table will be gone, as we see in Code Listing 61.
Code Listing 61: Temporary Tables Being Removed
> select * from etl_progress; +----------------------+---------------------+---------------------------+- | etl_progress.status | etl_progress.stage | etl_progress.processedat | etl_progress.rowcount | +----------------------+---------------------+---------------------------+- | Done | Transform.1 | 2016-02-02 07:03:01.0 | 328648 | | Done | Transform.2 | 1970-01-17 20:01:23.674 | 12435358 | +----------------------+---------------------+---------------------------+-2 rows selected (0.091 seconds) > !close Closing: 0: jdbc:hive2://127.0.0.1:10000 beeline> !connect jdbc:hive2://127.0.0.1:10000 -n root Connecting to jdbc:hive2://127.0.0.1:10000 … > select * from etl_progress; Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'etl_progress' (state=42S02,code=10001) |
In this example, when the Beeline user disconnects with the !close command, the session ends, and the Hive server deletes the temporary table. When the user connects again, a new session begins, and the temporary table will be gone.
With the basic load, insert, and CTAS statements, Hive supports the major patterns for getting data into the warehouse. If you have existing processes for extracting and transforming data, you can load those directly into Hive tables. That fast operation will result in data being securely stored in HDFS and available for querying through Hive.
For new data loads, an ELT process makes better use of Hive by initially using load to put the raw data into Hive tables and transforming it using HiveQL. The insert … select and create table … as select statements allow you to craft a complex query with functions to transform your data and have it populated in Hive through scalable map/reduce jobs.
Once you have the data as tables in Hive, you can create views and indexes to make the information approachable. When your next set of data is extracted, simply repeat the inserts and the new data will be appended to the existing tables.
In the next chapter we’ll look more closely at defining objects and modifying data using HiveQL, the Hive Query Language.