Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Thursday, July 28, 2022

Hello Lakehoue! Building Your First On-Prem Data Lakehouse

As the emerging concept of a data lakehouse is continuing to gain traction, I thought to write the hello world for it which I named as Hello Lakehouse. In this post first I'll elaborate some necessary concepts and then will come to the implementation part using open source technologies.

Introduction

A data lakehouse is an open data management architecture that combines the flexibility and cost-efficiency of data lakes with the data management and structure features of data warehouses, all on one data platform. So it is better to discuss little bit the warehouse first , then data lake and subsequently lakehouse.

Data Warehouse

Data warehouse is purpose-built for BI & Reporting, however it don't provide much support for video, audio, text, data science, ML. There is limited support for streaming. It is closed and in proprietary formats. Therefore most data is stored in data lakes. Further DWH has high implementation and maintenance costs and become outdated and require regular maintenance.


Data Lake

Data Lake is a centralized, highly flexible storage repository to handle all data for data science & ML in its raw, original and unformatted form. It enables organizations to gain advanced insight from unstructured data. It is focused around analyzing all types of data (structured, semi-structured, unstructured), OLAP, schema-on-read, API connectivity, and low-cost object storage systems for data in open file formats (i.e. Apache Parquet, ORC etc.).


However data lake is complex to setup and sometimes notoriously struggle with data quality, transactional support, data governance, and query performance issues.  Data Lakes built without vital skills, key capabilities, and specialized technologies will inevitably over time turn into “Data Swamp". 

As data lakes lack some critical features: they do not support transactions, they do not enforce data quality, and their lack of consistency / isolation makes it almost impossible to mix appends and reads, and batch and streaming jobs. A common approach is to use multiple systems – a data lake, several data warehouses, and other specialized systems such as streaming, time-series, graph, and image databases.  Having a multitude of systems introduces complexity and more importantly, introduces delay as data professionals invariably need to move or copy data between different systems.



Because of the advantages of the data warehouse and the data lake, most companies opt for a hybrid solution. However, this approach could lead to data duplication, which can be costly.

Data Lakehouse - Fixing Data Lake Problems

Lakehouse is a combined approach which is enabled by a new system design: implementing similar data structures and data management features to those in a data warehouse directly on top of low cost cloud/object storage in open formats.  A lakehouse gives you data versioning, governance, security, reduce data duplication and ACID properties that are needed even for unstructured data.

Data lakehouses usually start as data lakes containing all data types; then data is converted to a special format (eg; Delta Lake, Apache Hudi, and Apache Iceberg) which is open-source storage layer that brings reliability to data lakes by enabling ACID transactional processes from traditional data warehouses on data lakes.

Data lakehouses provide direct access to some of the most widely used business intelligence tools (Tableau, PowerBI) to enable advanced analytics. Additionally, data lakehouses use open-data formats (such as Parquet) with APIs and machine learning libraries, including Python/R, making it straightforward for data scientists and machine learning engineers to utilize the data.

Data lakehouse architecture enforces schema and data integrity making it easier to implement robust data security and governance mechanisms.

The main disadvantage of a data lakehouse is it’s still a relatively new  technology. As such, it’s unclear whether it will live up to its all promises. It may be years before data lakehouses can compete with mature big-data storage solutions.



Quick Comparison - DWH, Data Lake, Lakehouse




Lakehouse Implementation                 

For the lakehouse implementation I've considered the below environment/technologies.

Environment:

1- CentOS

2- S3 Object Store (Minio), the storage layer for our lakehouse

3- Hive metastore service (HMS), service that stores metadata and used by Trino

4- Apache Iceberg, open table format for huge analytic datasets

5- Trino 391, distributed SQL query engine, requires Java 17+


Step-1: Check your S3 Object store is working 

I've chosen Minio for the storage layer of my lakehouse. If you want to install Minio you can check this post Create Data Lake Without Hadoop. Please pay attention to the section "Configuring Hive Metastore" for the S3 related configuration to the metastore.

[hdpsysuser@hdpmaster ~]$ /data/apps/minio/minio server /data/miniolocal --console-address ":9001"




Step-2: Check HMS is working 

 HMS with S3 related configuration must be running. 

[hdpsysuser@hdpmaster ~]$ hive --service metastore



Step-3: Configure Trino for Iceberg 

This post assumes that your Trinio (Presto) is already installed and configured. If you want to install it, please refer to post Installing/Configuring PrestoDB. You will configure the Iceberg connector to write data in open format.

What is Iceberg?

Iceberg is an open table format for huge analytic datasets which is designed to improve on the known scalability limitations of Hive, which stores table metadata in a metastore that is backed by a relational database such as PostgreSQL or MySQL. Hive tracks partition locations in the metastore, but not individual data files. Trino queries using the Hive connector must first call the metastore to get partition locations, then call the underlying filesystem to list all data files inside each partition, and then read metadata from each data file.

The Iceberg table state is maintained in metadata files. All changes to table state create a new metadata file and replace the old metadata with an atomic swap. The table metadata file tracks the table schema, partitioning config, custom properties, and snapshots of the table contents.

Iceberg data files can be stored in either Parquet or ORC format, as determined by the format property in the table definition. The table format defaults to ORC. Since Iceberg stores the paths to data files in the metadata files, it only consults the underlying file system for files that must be read.

Apache Iceberg has a notion of data and delete files. Data files are the files Iceberg uses behind the scene to keep actual data. Delete files are the immutable files to encode rows that are deleted in existing data files. This is how Iceberg deletes/replaces individual rows in immutable data files without rewriting the files.

Iceberg scales to petabytes independent of the underlying storage layer and the access engine layer.

What is Trino Iceberg Connector?

The Iceberg connector allows querying data stored in files written in Iceberg format, It supports Apache Iceberg table spec version 1 and 2. The connector supports two Iceberg catalog types, you may use either a Hive metastore service (HMS) or AWS Glue. The catalog type is determined by the iceberg.catalog.type property, it can be set to either HIVE_METASTORE or GLUE.

When you use HIVE_METASTORE catalog, the Iceberg connector supports the same metastore configuration properties as the Hive connector.

Configuration

Below is the configuration for Trino 391 catalog for iceberg, create the Trino catalog properties file and place in the configuration folder (eg; /data/apps/trino-server-391/etc/catalog) and restart the Trino cluster (coordinator and all workers).

-- /data/apps/trino-server-391/etc/catalog/iceberg.properties
connector.name=iceberg
hive.metastore.uri=thrift://hdpmaster:9083
iceberg.catalog.type=HIVE_METASTORE
hive.s3.endpoint=http://hdpmaster:9000
hive.s3.aws-access-key=minioadmin
hive.s3.aws-secret-key=minioadmin
hive.s3.ssl.enabled=false
hive.s3.path-style-access=true
#hive.s3select-pushdown.enabled=true

If you have run all the steps above correctly, you got a working lakehouse environment and now it is time to test it.

Step-4: Test with Iceberg tables

Connect to Trino using the iceberg catalog configured in previous step.

--Run trino if not running
[trino391@hdpmaster ~]$ launcher run
[trino391@hdpmaster bin]$ trinocli --server http://hdpmaster:6060 --catalog iceberg --schema icebergdb




CREATE SCHEMA dbiceberg WITH ( LOCATION = 's3a://datalake/dbiceberg' );



Verify from Minio console if the location exists


Creating test datasets on iceberg

Let's create the test datasets now to experiment with iceberg.

DROP TABLE dbiceberg.dept;
CREATE  TABLE dbiceberg.dept( deptno integer,  dname  varchar,  loc    varchar)
with (FORMAT = 'orc');

INSERT INTO dbiceberg.dept
VALUES(10,'ACCOUNTING','NEW YORK'),
(20,'RESEARCH','DALLAS'),
(30,'SALES','CHICAGO'),
(40,'OPERATIONS','BOSTON');

-- You can create table with parquet format if required.
CREATE  TABLE icebergdb.deptparq( deptno integer,  dname  varchar,  loc    varchar) with (FORMAT = 'parquet');


DROP TABLE dbiceberg.emp;
CREATE TABLE dbiceberg.emp(empno INTEGER,ename VARCHAR,job VARCHAR,mgr INTEGER,hiredate VARCHAR, sal INTEGER,comm INTEGER,deptno INTEGER) 
WITH (FORMAT = 'orc',partitioning = ARRAY['deptno']);

INSERT INTO dbiceberg.emp
VALUES
(7369,'SMITH' ,'CLERK'    ,7902,'17-DEC-80',800 ,NULL,20),
(7499,'ALLEN' ,'SALESMAN' ,7698,'20-FEB-81',1600,300 ,30),
(7521,'WARD'  ,'SALESMAN' ,7698,'22-FEB-81',1250,500 ,30),
(7566,'JONES' ,'MANAGER'  ,7839,'02-APR-81',2975,NULL,20),
(7654,'MARTIN','SALESMAN' ,7698,'28-SEP-81',1250,1400,30),
(7698,'BLAKE' ,'MANAGER'  ,7839,'01-MAY-81',2850,NULL,30),
(7782,'CLARK' ,'MANAGER'  ,7839,'09-JUN-81',2450,NULL,10),
(7788,'SCOTT' ,'ANALYST'  ,7566,'19-APR-87',3000,NULL,20),
(7839,'KING'  ,'PRESIDENT',NULL,'17-NOV-81',5000,NULL,10),
(7844,'TURNER','SALESMAN' ,7698,'08-SEP-81',1500,0   ,30),
(7876,'ADAMS' ,'CLERK'    ,7788,'23-MAY-87',1100,NULL,20),
(7900,'JAMES' ,'CLERK'    ,7698,'03-DEC-81',950 ,NULL,30),
(7902,'FORD'  ,'ANALYST'  ,7566,'03-DEC-81',3000,NULL,20),
(7934,'MILLER','CLERK'    ,7782,'23-JAN-82',1300,NULL,10);




Observe how iceberg did the automatic partition management.

SQL support - Run Test Queries

Iceberg connector provides read access and write access to data and metadata in Iceberg. In addition to the globally available and read operation statements, the connector supports INSERT, DELETE, UPDATE, schema and table management, materilized view/view management.

-- Test join
trino:dbiceberg> select d.deptno,d.dname,e.ename,e.job,e.sal from dept d,emp e where d.deptno=e.deptno;

-- Top N rows
trino:dbiceberg> select deptno,ename,sal from emp order by sal desc limit 5;

--Calculated field
trino:dbiceberg> select ename,sal,comm,sal+comm total from emp where comm > 0;

-- Aggregate function
trino:dbiceberg> select deptno,count(*),max(sal),min(sal) from emp group by deptno;

-- CTAS
trino:dbiceberg> create table emp2 WITH (FORMAT = 'orc') as select * from emp ;

-- DML
trino:dbiceberg> insert into emp(empno,ename,comm,deptno) values(1,'INAM',NULL,50);
trino:dbiceberg> update emp set sal=4000 where empno=1;
trino:dbiceberg> delete from emp where empno=50 and deptno =50;



--Prepare statement
trino:icebergdb> PREPARE my_select1 FROM SELECT * FROM hive.default.pvw_test;
trino:icebergdb> EXECUTE my_select1;
trino:icebergdb>  PREPARE my_select2 FROM SELECT * FROM hive.default.pvw_test WHERE deptno = ?;
trino:icebergdb> EXECUTE my_select2 USING 10;

--Explain
trino:icebergdb> EXPLAIN select * from hive.default.pvw_test;
                                                                                              Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SOURCE]
     Output layout: [deptno_0, dname, ename, job, sal]
     Output partitioning: SINGLE []
     Output[columnNames = [deptno, dname, ename, job, sal]]
     │   Layout: [deptno_0:integer, dname:varchar, ename:varchar, job:varchar, sal:integer]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 276B, network: 276B}
     │   deptno := deptno_0
     └─ InnerJoin[criteria = ("deptno_0" = "deptno"), hash = [$hashvalue, $hashvalue_2], distribution = REPLICATED]
        │   Layout: [ename:varchar, job:varchar, sal:integer, deptno_0:integer, dname:varchar]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 276B, network: 276B}
        │   Distribution: REPLICATED
        │   dynamicFilterAssignments = {deptno -> #df_288}
        ├─ ScanFilterProject[table = iceberg:icebergdb.emp$data@7644006307127811815, filterPredicate = true, dynamicFilters = {"deptno_0" = #df_288}]
        │      Layout: [ename:varchar, job:varchar, sal:integer, deptno_0:integer, $hashvalue:bigint]
        │      Estimates: {rows: 42 (5.29kB), cpu: 4.92k, memory: 0B, network: 0B}/{rows: 42 (5.29kB), cpu: 9.84k, memory: 0B, network: 0B}/{rows: 42 (5.29kB), cpu: 15.13k, memory: 0B,
        │      $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("deptno_0"), 0))
        │      ename := 2:ename:varchar
        │      deptno_0 := 8:deptno:integer
        │      job := 3:job:varchar
        │      sal := 6:sal:integer
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [deptno:integer, dname:varchar, $hashvalue_2:bigint]
           │   Estimates: {rows: 4 (276B), cpu: 516, memory: 0B, network: 276B}
           └─ RemoteSource[sourceFragmentIds = [1]]
                  Layout: [deptno:integer, dname:varchar, $hashvalue_3:bigint]
                  Estimates:

 Fragment 1 [SOURCE]
     Output layout: [deptno, dname, $hashvalue_4]
     Output partitioning: BROADCAST []
     ScanProject[table = iceberg:icebergdb.dept$data@666769454999505708]
         Layout: [deptno:integer, dname:varchar, $hashvalue_4:bigint]
         Estimates: {rows: 4 (276B), cpu: 240, memory: 0B, network: 0B}/{rows: 4 (276B), cpu: 516, memory: 0B, network: 0B}
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("deptno"), 0))
         dname := 2:dname:varchar
         deptno := 1:deptno:integer


(1 row)

trino:icebergdb> EXPLAIN (TYPE IO, FORMAT JSON) select * from hive.default.pvw_test;
trino:icebergdb> EXPLAIN ANALYZE select * from hive.default.pvw_test;


-- Table info
trino:dbiceberg> describe dbiceberg.emp;

--ANALYZE
trino:icebergdb> ANALYZE emptrino;
Query 20220728_050643_00008_ddudz failed: This connector does not support analyze

-- Stats
trino:icebergdb> SHOW STATS FOR emp;
 column_name | data_size | distinct_values_count |   nulls_fraction    | row_count | low_value | high_value
-------------+-----------+-----------------------+---------------------+-----------+-----------+------------
 empno       |      NULL |                  NULL |                 0.0 |      NULL | 7369      | 7934
 ename       |      NULL |                  NULL |                 0.0 |      NULL | NULL      | NULL
 job         |      NULL |                  NULL |                 0.0 |      NULL | NULL      | NULL
 mgr         |      NULL |                  NULL | 0.07142857142857142 |      NULL | 7566      | 7902
 hiredate    |      NULL |                  NULL |                 0.0 |      NULL | NULL      | NULL
 sal         |      NULL |                  NULL |                 0.0 |      NULL | 800       | 5000
 comm        |      NULL |                  NULL |  0.7142857142857143 |      NULL | 0         | 1400
 deptno      |      NULL |                  NULL |                 0.0 |      NULL | 10        | 30
 NULL        |      NULL |                  NULL |                NULL |      42.0 | NULL      | NULL
(9 rows)

trino:icebergdb> SHOW STATS FOR (select * from emp where deptno=10);

-- TRANSACTION
trino:icebergdb> START TRANSACTION READ WRITE;
trino:icebergdb> insert into emp(deptno,empno,ename,comm) values(10,3,'IJK',NULL);
Query 20220728_052402_00023_ddudz failed: Catalog only supports writes using autocommit: iceberg

-- DDL
trino:icebergdb> ALTER TABLE emptrino EXECUTE optimize(file_size_threshold => '1MB');
ALTER TABLE EXECUTE

Query 20220727_112424_00006_42be2, FINISHED, 1 node
Splits: 31 total, 31 done (100.00%)
4:55 [14.7M rows, 5.03MB] [49.8K rows/s, 17.5KB/s]

The optimize command is used for rewriting the active content of the specified table so that it is merged into fewer but larger files. In case that the table is partitioned, the data compaction acts separately on each partition selected for optimization. This operation improves read performance.

trino:icebergdb> ALTER TABLE example EXECUTE expire_snapshots(retention_threshold => '0d');

Query 20220727_114119_00014_42be2, FAILED, 1 node
Splits: 1 total, 0 done (0.00%)
0.31 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20220727_114119_00014_42be2 failed: Retention specified (0.00d) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.expire_snapshots.min-retention configuration property or iceberg.expire_snapshots_min_retention session property

The expire_snapshots command removes all snapshots and all related metadata and data files. Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. The procedure affects all snapshots that are older than the time period configured with the retention_threshold parameter.


trino:icebergdb> ALTER TABLE emptrino EXECUTE remove_orphan_files(retention_threshold => '7d');
The remove_orphan_files command removes all files from table’s data directory which are not linked from metadata files and that are older than the value of retention_threshold parameter.

trino:icebergdb> ALTER TABLE emptrino SET PROPERTIES format_version = 2;


Rolling back to a previous snapshot
Iceberg supports a “snapshot” model of data, where table snapshots are identified by an snapshot IDs. The connector provides a system snapshots table for each Iceberg table.


trino:icebergdb> SELECT snapshot_id FROM icebergdb."emptrino$snapshots" ORDER BY committed_at DESC LIMIT 1;
     snapshot_id
---------------------
 6064586986348643724
(1 row)


A SQL procedure system.rollback_to_snapshot allows the caller to roll back the state of the table to a previous snapshot id:

trino:icebergdb> CALL iceberg.system.rollback_to_snapshot('icebergdb', 'emptrino', 6064586986348643724);
CALL

Schema evolution

Iceberg and the Iceberg connector support schema evolution, with safe column add, drop, reorder and rename operations, including in nested structures. Table partitioning can also be changed and the connector can still query data created before the partitioning change.

presto:icebergdb> ALTER TABLE emp2 RENAME TO empnew;
presto:icebergdb> ALTER TABLE empnew ADD COLUMN zip varchar;
presto:icebergdb> ALTER TABLE empnew DROP COLUMN zip;
presto:icebergdb> ALTER TABLE empnew RENAME COLUMN empno TO _id;

ALTER TABLE table_name SET PROPERTIES partitioning = ARRAY[<existing partition columns>, 'my_new_partition_column'];
trino:icebergdb> ALTER TABLE emptrino SET PROPERTIES partitioning = ARRAY['deptno', 'job']; -- it does not touch the existing data but next time when you insert, it has effect.
eg;
insert into emptrino(empno,ename,job,deptno) values(1,'inam','admin',50);

Metadata columns/Tables

In addition to the defined columns, the Iceberg connector automatically exposes path metadata as a hidden column ("$path") in each table

trino:icebergdb> SELECT *, "$path"

              -> FROM icebergdb.dept;



The connector exposes several metadata tables for each Iceberg table. These metadata tables contain information about the internal structure of the Iceberg table. You can query each metadata table by appending the metadata table name to the table name:

trino:icebergdb> SELECT * FROM "dept$data"; -- alias for the Iceberg table itself

trino:icebergdb> SELECT * FROM "dept$properties"; -- retrieve the properties of the current snapshot

         key          | value

----------------------+-------

 write.format.default | ORC

(1 row)

trino:icebergdb> SELECT * FROM "dept$history"; -- log of the metadata changes performed
           made_current_at           |     snapshot_id     |      parent_id      | is_current_ancestor
-------------------------------------+---------------------+---------------------+---------------------
 2022-07-26 13:32:40.131 Asia/Riyadh | 1589435948958886067 |                NULL | true
 2022-07-26 13:33:27.920 Asia/Riyadh |  666769454999505708 | 1589435948958886067 | true
(2 rows)

trino:icebergdb> SELECT * FROM "dept$snapshots"; -- information about the snapshots
            committed_at             |     snapshot_id     |      parent_id      | operation |                                                manifest_list
-------------------------------------+---------------------+---------------------+-----------+------------------------------------------------------------------------------------------
 2022-07-26 13:32:40.131 Asia/Riyadh | 1589435948958886067 |                NULL | append    | s3a://datalake/icebergdb/dept/metadata/snap-1589435948958886067-1-404c6c10-f5c3-4a95-8fa3
 2022-07-26 13:33:27.920 Asia/Riyadh |  666769454999505708 | 1589435948958886067 | append    | s3a://datalake/icebergdb/dept/metadata/snap-666769454999505708-1-37de0496-f752-41f8-9de5-
(2 rows)

trino:icebergdb> SELECT * FROM "dept$manifests"; -- detailed overview of the manifests corresponding to the snapshots
                                        path                                         | length | partition_spec_id | added_snapshot_id  | added_data_files_count | added_rows_count | exi
-------------------------------------------------------------------------------------+--------+-------------------+--------------------+------------------------+------------------+----
 s3a://datalake/icebergdb/dept/metadata/37de0496-f752-41f8-9de5-e775dfc3dd89-m0.avro |   5626 |                 0 | 666769454999505708 |                      1 |                4 |
(1 row)

trino:icebergdb> SELECT * FROM "emptrino$partitions"; -- detailed overview of the partitions 
              partition              | record_count | file_count | total_size |
-------------------------------------+--------------+------------+------------+---------------------------------------------------------------------------------------------------------
 {empno=NULL, deptno=NULL, job=NULL} |     14680064 |          1 |    5531545 | {empno={min=7369, max=7934, null_count=0, nan_count=NULL}, ename={min=ADAMS, max=WARD, null_count=0, nan
 {empno=NULL, deptno=50, job=admin}  |            1 |          1 |        799 | {empno={min=1, max=1, null_count=0, nan_count=NULL}, ename={min=inam, max=inam, null_count=0, nan_count=
(2 rows)

trino:icebergdb> SELECT * FROM "dept$files"; -- detailed overview of the data files

 content |                                  file_path                                  | file_format | record_count | file_size_in_bytes | column_sizes |  value_counts   | null_value_c

---------+-----------------------------------------------------------------------------+-------------+--------------+--------------------+--------------+-----------------+-------------

       0 | s3a://datalake/icebergdb/dept/data/021c3231-602a-403c-8396-620c94c09572.orc | ORC         |            4 |                602 | NULL         | {1=4, 2=4, 3=4} | {1=0, 2=0, 3

(1 row)


Views
trino:icebergdb> create view pvw_test as select d.deptno,d.dname,e.ename,e.job,e.sal from dept d,emp e where d.deptno=e.deptno;


Materialized views

The Iceberg connector supports Materialized view management. In the underlying system each materialized view consists of a view definition and an Iceberg storage table. You can use the Iceberg table properties to control the created storage table and therefore the layout and performance.

CREATE MATERIALIZED VIEW pmvw_emp

COMMENT 'Testing materialized view'

WITH ( format = 'ORC', partitioning = ARRAY['deptno'] )

AS

SELECT d.deptno,d.dname,e.ename,e.job,e.sal + coalesce(e.comm,0) "Total Salary"  

FROM dept d,emp e 

WHERE d.deptno=e.deptno;

-- Show defined materialized view properties

trino:icebergdb> SELECT * FROM system.metadata.materialized_view_properties;

-- Show metadata about the materialized views 

trino:icebergdb> SELECT * FROM system.metadata.materialized_views;

trino:icebergdb> select * from pmvw_emp;

-- Show the SQL statement that creates the specified materialized view 

trino:icebergdb> SHOW CREATE MATERIALIZED VIEW pmvw_emp;

-- Refresh view 
trino:icebergdb> REFRESH MATERIALIZED VIEW pmvw_emp;


Congratulations!  Our implementation ends here. 

Optional: Enabling Iceberg in Hive      

In order to use Hive 2.3.x or Hive 3.1.x, you must load the Iceberg-Hive runtime jar and enable Iceberg support, either globally or for an individual table using a table property. 

Loading runtime jar
To enable Iceberg support in Hive, the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These are provided by the iceberg-hive-runtime jar file. For example, if using the Hive shell, this can be achieved by issuing a statement like so:

To use Iceberg in Hive 2 or Hive 3, download the Hive runtime JAR (iceberg-hive-runtime-0.14.0.jar)  from below location and add it to Hive using ADD JAR.

https://iceberg.apache.org/releases/

I copied the jar file on location /data/apps/apache-hive-3.1.2-bin/iceberg-hive-runtime-0.14.0.jar

0: jdbc:hive2://> add JAR file:///data/apps/apache-hive-3.1.2-bin/iceberg-hive-runtime-0.14.0.jar;
Added [file:///data/apps/apache-hive-3.1.2-bin/iceberg-hive-runtime-0.14.0.jar] to class path
Added resources: [file:///data/apps/apache-hive-3.1.2-bin/iceberg-hive-runtime-0.14.0.jar]
No rows affected (0.005 seconds)
0: jdbc:hive2://>

Enabling support
There are two ways to enable Hive support: globally in Hadoop Configuration and per-table using a table property. As I'm not using Hadoop for this lakehouse implementation, I enabled support at table level. Please note that this table (created in Hive) cannot be queried by Presto/Trino.


CREATE TABLE iceberg_hivetbl (
  id bigint, name string
) PARTITIONED BY (
  dept string
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  LOCATION 's3a://datalake/iceberg/iceberg_t1'
  TBLPROPERTIES ( 'write.format.default'='parquet');


Test Using Iceberg Table as Hive External Table:
For the experiment purpose I created an external table in Hive for the table created in Presto using Iceberg connector and found it worked for ORC and failed for parquet. 

CREATE  external TABLE extbl_on_ibfile( deptno int,  dname  string,  loc    string) 
STORED AS ORC LOCATION 's3a://datalake/icebergdb/dept/data';





No comments: