Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Thursday, January 25, 2018

Using Avro with Hive and Presto



Pre-requsites


The AvroSerde allows users to read or write Avro data as Hive tables. The AvroSerde's bullet points:


  • Infers the schema of the Hive table from the Avro schema. Starting in Hive 0.14, the Avro schema can be inferred from the Hive table schema.
  • Reads all Avro files within a table against a specified schema, taking advantage of Avro's backwards compatibility abilities
  • Supports arbitrarily nested schemas.
  • Translates all Avro data types into equivalent Hive types. Most types map exactly, but some Avro types don't exist in Hive and are automatically converted by the AvroSerde.
  • Understands compressed Avro files.
  • Transparently converts the Avro idiom of handling nullable types as Union[T, null] into just T and returns null when appropriate.
  • Writes any Hive table to Avro files.
  • Has worked reliably against our most convoluted Avro schemas in our ETL process.
  • Starting in Hive 0.14, columns can be added to an Avro backed Hive table using the Alter Table statement.

Hive Table - backed by Avro data files


To create an Avro-backed table, specify the serde as org.apache.hadoop.hive.serde2.avro.AvroSerDe, specify the inputformat as org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat, and the outputformat as org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat. Also provide a location from which the AvroSerde will pull the most current schema for the table.


DROP TABLE scott.avro_hive_table;

CREATE EXTERNAL TABLE scott.avro_hive_table
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/tmp/avroTestData'
TBLPROPERTIES('avro.schema.url'='/tmp/avroSchema/emp.avsc')

Query the table, Avro-backed table can be worked with in Hive like any other table


0: jdbc:hive2://dn04:10000/flume> select * from scott.avro_hive_table;
+------------------------+------------------------+----------------------+-----------------------+--+
| avro_hive_table.empno  | avro_hive_table.ename  | avro_hive_table.sal  | avro_hive_table.comm  |
+------------------------+------------------------+----------------------+-----------------------+--+
| 7369                   | SMITH                  | 800                  | 0                     |
| 7499                   | ALLEN                  | 1600                 | 300                   |
| 7521                   | WARD                   | 1250                 | 500                   |
| 7566                   | JONES                  | 2975                 | 0                     |
| 7654                   | MARTIN                 | 1250                 | 1400                  |
| 7698                   | BLAKE                  | 2850                 | 0                     |
| 7782                   | CLARK                  | 2450                 | 0                     |
| 7788                   | SCOTT                  | 3000                 | 0                     |
| 7839                   | KING                   | 5000                 | 0                     |
| 7844                   | TURNER                 | 1500                 | 0                     |
| 7876                   | ADAMS                  | 1100                 | 0                     |
| 7900                   | JAMES                  | 950                  | 0                     |
| 7902                   | FORD                   | 3000                 | 0                     |
| 7934                   | MILLER                 | 1300                 | 0                     |
+------------------------+------------------------+----------------------+-----------------------+--+
14 rows selected (0.131 seconds)


0: jdbc:hive2://dn04:10000/flume> insert into scott.avro_hive_table select * from scott.avro_hive_table;
INFO  : Tez session hasn't been created yet. Opening session
INFO  : Dag name: insert into scott.avr...cott.avro_hive_table(Stage-1)
INFO  : Status: Running (Executing on YARN cluster with App id application_1514186075960_0046)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5.90 s
--------------------------------------------------------------------------------
INFO  : Loading data to table scott.avro_hive_table from hdfs://nn01:8020/tmp/avroTestData/.hive-staging_hive_2018-01-25_10-06-20_434_5749761851837272530-5/-ext-10000
INFO  : Table scott.avro_hive_table stats: [numFiles=3, totalSize=1333]
No rows affected (12.793 seconds)



Starting in Hive 0.14, Avro-backed tables can simply be created by using "STORED AS AVRO" in a DDL statement. AvroSerDe takes care of creating the appropriate Avro schema from the Hive table schema, a big win in terms of Avro usability in Hive.





DROP TABLE scott.avro_hive_table2;

CREATE EXTERNAL TABLE scott.avro_hive_table2
(
empno int,
ename string,
sal int,
comm int
)
STORED AS AVRO
LOCATION '/tmp/avroTestData'


0: jdbc:hive2://dn04:10000/flume> desc scott.avro_hive_table2;

+-----------+------------+----------+--+
| col_name  | data_type  | comment  |
+-----------+------------+----------+--+
| empno     | int        |          |
| ename     | string     |          |
| sal       | int        |          |
| comm      | int        |          |
+-----------+------------+----------+--+
4 rows selected (0.197 seconds)

0: jdbc:hive2://dn04:10000/flume> select * from scott.avro_hive_table2 limit 2;

+-------------------------+-------------------------+-----------------------+------------------------+--+
| avro_hive_table2.empno  | avro_hive_table2.ename  | avro_hive_table2.sal  | avro_hive_table2.comm  |
+-------------------------+-------------------------+-----------------------+------------------------+--+
| 7369                    | SMITH                   | 800                   | 0                      |
| 7499                    | ALLEN                   | 1600                  | 300                    |
+-------------------------+-------------------------+-----------------------+------------------------+--+
2 rows selected (0.096 seconds)

Writing tables to Avro files
The AvroSerde can serialize any Hive table to Avro files. This makes it effectively an any-Hive-type to Avro converter. In order to write a table to an Avro file, you must first create an appropriate Avro schema (except in Hive 0.14.0 and later). Create as select type statements are not currently supported. Simply create a hive Avro table and then insert data from another table (with another format eg; textfile).

HBase Integration
Hive 0.14.0 onward supports storing and querying Avro objects in HBase columns by making them visible as structs to Hive. This allows Hive to perform ad hoc analysis of HBase data which can be deeply structured. Prior to 0.14.0, the HBase Hive integration only supported querying primitive data types in columns.
Exception Managment
Hive tends to swallow exceptions from the AvroSerde that occur before job submission. To force Hive to be more verbose, it can be started with *hive --hiveconf hive.root.logger=INFO,console*, which will spit orders of magnitude more information to the console and will likely include any information the AvroSerde is trying to get you about what went wrong. If the AvroSerde encounters an error during MapReduce, the stack trace will be provided in the failed task log, which can be examined from the JobTracker's web interface. The AvroSerde only emits the AvroSerdeException; look for these. Please include these in any bug reports. The most common is expected to be exceptions while attempting to serializing an incompatible type from what Avro is expecting.



Observations [Optional]:


A. Query from the hive table on avro gives error on Presto, same query run fine in Hive

presto:flume> select count(*) from scott.avro_hive_table;

Query 20180125_070542_00003_ab4e5, FAILED, 2 nodes
Splits: 41 total, 0 done (0.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20180125_070542_00003_ab4e5 failed: HIVE_CURSOR_ERROR

error log from Presto GUI is below

com.facebook.presto.spi.PrestoException: HIVE_CURSOR_ERROR
at com.facebook.presto.hive.GenericHiveRecordCursor.advanceNextPosition(GenericHiveRecordCursor.java:218)
at com.facebook.presto.hive.HiveRecordCursor.advanceNextPosition(HiveRecordCursor.java:179)
at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:99)
at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:256)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:416)
at com.facebook.presto.operator.Driver.processFor(Driver.java:310)
at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:622)
at com.facebook.presto.execution.TaskExecutor$PrioritizedSplitRunner.process(TaskExecutor.java:586)
at com.facebook.presto.execution.TaskExecutor$Runner.run(TaskExecutor.java:734)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hive.serde2.avro.BadSchemaException
at org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:199)
at com.facebook.presto.hive.GenericHiveRecordCursor.advanceNextPosition(GenericHiveRecordCursor.java:212)
... 11 more

I used workaround to resolve it, used avro.schema.literal and embed schema as part of create external table.

Following is the schema that I used


{

   "namespace": "myns",

   "type": "record",
   "name": "emp",
   "fields": [
      {"name": "empno", "type": "int"},
  {"name": "ename", "type": "string"},
      {"name": "sal", "type": "int"},
      {"name": "comm", "type": "int"}
   ]
}


Re-created the table with  avro.schema.literal

DROP TABLE scott.avro_hive_table;

CREATE EXTERNAL TABLE scott.avro_hive_table
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/tmp/avroTestData'
TBLPROPERTIES('avro.schema.literal'='{"namespace": "myns","type": "record","name": "emp","fields": [{"name": "empno", "type": "int"},{"name": "ename", "type": "string"},{"name": "sal", "type": "int"},   {"name": "comm", "type": "int"}]}')
;

Query again from Presto

0: jdbc:hive2://dn04:10000/flume> select * from scott.avro_hive_table limit 2;

+------------------------+------------------------+----------------------+-----------------------+--+
| avro_hive_table.empno  | avro_hive_table.ename  | avro_hive_table.sal  | avro_hive_table.comm  |
+------------------------+------------------------+----------------------+-----------------------+--+
| 7369                   | SMITH                  | 800                  | 0                     |
| 7499                   | ALLEN                  | 1600                 | 300                   |
+------------------------+------------------------+----------------------+-----------------------+--+




B. Performance degradation


I ran count query from Presto and found performance issue with table created with STORED AS AVRO. This needs to be investigated further. I just put this observation not to forget it.



avro_hive_table (created with schema literal provided) - count query is too fast
avro_hive_table2 (created without schema literal ) - count query is slow and even failed 

On Presto
************* presto:flume> select count(*) from scott.avro_hive_table;
   _col0
-----------
 117440484
(1 row)

Query 20180125_123438_00046_ab4e5, FINISHED, 1 node
Splits: 76 total, 76 done (100.00%)

0:10 [117M rows, 1.23GB] [11.2M rows/s, 120MB/s]




presto:flume> select count(*) from scott.avro_hive_table2;

Query 20180125_094259_00040_ab4e5, RUNNING, 2 nodes
Splits: 62 total, 17 done (27.42%)
1:12 [1.14M rows, 12.2MB] [15.7K rows/s, 173KB/s]

Query is gone (server restarted?)


presto:flume> select count(*) from scott.avro_hive_table2;
   _col0
-----------
 117440484
(1 row)

Query 20180125_123508_00047_ab4e5, FINISHED, 1 node
Splits: 76 total, 76 done (100.00%)

12:34 [117M rows, 1.23GB] [156K rows/s, 1.67MB/s]

There is a huge difference 10 seconds for table created with schema literal and more than 12 minutes table with STORED AS AVRO.


On Hive
**********
I then performed the same test on Hive also as below. Results are like Presto so I recommend to use the schema literal while creating Hive Table backed by Avro.


0: jdbc:hive2://te1-hdp-rp-dn04:10000/flume> select count(*) from scott.avro_hive_table;

INFO  : Session is already open
INFO  : Dag name: select count(*) from scott.avro_hive_table(Stage-1)
INFO  : Status: Running (Executing on YARN cluster with App id application_1514186075960_0047)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      3          3        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 57.69 s
--------------------------------------------------------------------------------
+------------+--+
|    _c0     |
+------------+--+
| 117440484  |
+------------+--+

1 row selected (58.29 seconds)



0: jdbc:hive2://te1-hdp-rp-dn04:10000/flume> select count(*) from scott.avro_hive_table2;

INFO  : Session is already open
INFO  : Dag name: select count(*) from scott.avro_hive_table2(Stage-1)
INFO  : Status: Running (Executing on YARN cluster with App id application_1514186075960_0047)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      3          3        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 263.57 s
--------------------------------------------------------------------------------
+------------+--+
|    _c0     |
+------------+--+
| 117440484  |
+------------+--+
1 row selected (264.178 seconds)


No comments: