Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Tuesday, March 13, 2018

Spooling Files to HBase using Flume


One of my team wants to upload the contents of file existing in a specific directory (spooling dir) to HBase for some analysis. For the purpose we will be using Flume's spooldir-source which will allow users and applications to place files in spooling dir and process each line as one event to put it in HBase. It is assumed that Hadoop cluster and HBase is running, our environment is on HDP 2.6.
Components Used
Spooling Directory Source [spooldir] :

This source will watch the specified directory for new files, and will parse events out of new files as they appear. Once a given file is successfully read into the channel, it is either renamed to .COMPLETED file or deleted. Only uniquely-named files must be dropped into the spooling directory.

Below is the property table for spooling directory source and required properties are in bold.

Property Name Default Description
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader FALSE Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader FALSE Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
ignorePattern ^$ Regular expression specifying which files to ignore (skip)
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder oldest In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldestand youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest laxicographical order will be consumed first. In case ofrandom any file will be picked randomly.
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.
batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character" char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.
deserializer LINE Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder.

Spillable Memory Channel:

Spillable Memory Channel can be treated as a combination of memory channel & file channel. It is introduced to overcome the limitations of memory channel of losing events when memory queue is filled. It uses two storage mechanisms In-memory queue and disk.

It stores events primarily in an in-memory queue and once the queue is filled, additional incoming events are stored on a disk backed up by the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages. 

Below is the property table for spillable memory channel and required properties are in bold.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
  • To disable the use of the in-memory queue and function like a file channel, we can set the property memoryCapacity = 0 and need to provide overflowCapacity, checkpointDir & dataDirs properties of channel.
  • To disable the use of overflow disk and function as a in-memory channel, we can set the property overflowCapacity = 0 and can omit checkpointDir & dataDirs properties but need to specify memoryCapacity value to non-zero value.

HBase Sink:

This sink reads events from a channel and writes them to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink supports batch reading of events from the channel, to minimize the number of flushes on the hbase tables.

This sink will commit each transaction if the table’s write buffer size is reached or if the number of events in the current transaction reaches the batch size, whichever comes first. Flume provides two serializers for HBase sink. 

The impleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. 

The RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.

Below is the property table for HBase sink and required properties are in bold.

Property Name Default Description
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the propertyhbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value ofzookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements FALSE Should the sink coalesce multiple increments to a cell per batch.
serializer see description org.apache.flume.sink.hbase.SimpleHbaseEventSerializer. Default increment column = “iCol", payload column = “pCol"


1- Prepare Flume Agent Configuration

Add the below configuration properties in SpoolAgent.conf file to create 
SpoolAgent with Spooling Directory source, spillable memory channel and HBase Sink. Create necessary folders on OS as per your configuration and grant the proper rights to the user who will be running flume agent.


### SpoolAgent-Spooling Directory Source, Spillable Memory Channel and HBase Sink ###
# Name the components on this agent 
SpoolAgent.sources = spooldir-source  
SpoolAgent.channels = spillmem-channel
SpoolAgent.sinks = hbase-sink
# Describe/configure Source
SpoolAgent.sources.spooldir-source.type = spooldir
SpoolAgent.sources.spooldir-source.spoolDir = /data/flume/spooldir
SpoolAgent.sources.spooldir-source.fileHeader = false
# Describe the sink
SpoolAgent.sinks.hbase-sink.type = hbase
SpoolAgent.sinks.hbase-sink.table = spooled_table
SpoolAgent.sinks.hbase-sink.columnFamily = spool_cf
# Use a channel which buffers events in file
SpoolAgent.channels.spillmem-channel.type = SPILLABLEMEMORY
SpoolAgent.channels.spillmem-channel.memoryCapacity = 10000
SpoolAgent.channels.spillmem-channel.overflowCapacity = 1000000
SpoolAgent.channels.spillmem-channel.byteCapacity = 80000
SpoolAgent.channels.spillmem-channel.checkpointDir = /data/flume/checkpoint/
SpoolAgent.channels.spillmem-channel.dataDirs = /data/flume/data/
# Bind the source and sink to the channel
SpoolAgent.sources.spooldir-source.channels = spillmem-channel = spillmem-channel

2- Create Table in HBase

You need to create the table using HBase Shell as per agent configuration. 

hbase(main):014:0> create 'spooled_table', 'spool_cf'
0 row(s) in 2.3170 seconds

=> Hbase::Table - spooled_table

3- Start Flume Agent

As I'm on HDP and want to use Flume on command prompt rather than Ambari, I need to set the required environment variables for HBase access.

[hbase@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[hbase@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
[hbase@dn04 ~]$ export FLUME_HOME=/usr/hdp/

[hbase@dn04 ~]$ flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/SpoolAgent.conf -Dflume.root.logger=DEBUG,console -n SpoolAgent

4- Verify the process

Now place some text files in spool directory , Flume agent will read the event and put into HBase table. After copying the each file event, agent will rename the file ending with .COMPLETED. 
Check the data using HBase Shell also.

No comments: