Scenario:
One of my team wants to upload the contents of file existing in a specific directory (spooling dir) to HBase for some analysis. For the purpose we will be using Flume's spooldir-source which will allow users and applications to place files in spooling dir and process each line as one event to put it in HBase. It is assumed that Hadoop cluster and HBase is running, our environment is on HDP 2.6.
Components UsedSpooling Directory Source [spooldir] :
This source will watch the specified directory for new files, and will parse events out of new files as they appear. Once a given file is successfully read into the channel, it is either renamed to .COMPLETED file or deleted. Only uniquely-named files must be dropped into the spooling directory.
Below is the property table for spooling directory source and required properties are in bold.
Property Name | Default | Description |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | FALSE | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | FALSE | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip) |
trackerDir | .flumespool | Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
consumeOrder | oldest | In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldestand youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest laxicographical order will be consumed first. In case ofrandom any file will be picked randomly. |
maxBackoff | 4000 | The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL | What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character" char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer | LINE | Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder. |
Spillable Memory Channel:
Spillable Memory Channel can be treated as a combination of memory channel & file channel. It is introduced to overcome the limitations of memory channel of losing events when memory queue is filled. It uses two storage mechanisms In-memory queue and disk.
It stores events primarily in an in-memory queue and once the queue is filled, additional incoming events are stored on a disk backed up by the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages.
Below is the property table for spillable memory channel and required properties are in bold.
Property Name | Default | Description |
type | – | The component type name, needs to be SPILLABLEMEMORY |
memoryCapacity | 10000 | Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero. |
overflowCapacity | 100000000 | Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero. |
overflowTimeout | 3 | The number of seconds to wait before enabling disk overflow when memory fills up. |
byteCapacityBufferPercentage | 20 | Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description | Maximum bytes of memory allowed as a sum of all events in the memory queue. |
avgEventSize | 500 | Estimated average size of events, in bytes, going into the channel |
- To disable the use of the in-memory queue and function like a file channel, we can set the property memoryCapacity = 0 and need to provide overflowCapacity, checkpointDir & dataDirs properties of channel.
- To disable the use of overflow disk and function as a in-memory channel, we can set the property overflowCapacity = 0 and can omit checkpointDir & dataDirs properties but need to specify memoryCapacity value to non-zero value.
HBase Sink:
This sink reads events from a channel and writes them to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase. This sink supports batch reading of events from the channel, to minimize the number of flushes on the hbase tables.
This sink will commit each transaction if the table’s write buffer size is reached or if the number of events in the current transaction reaches the batch size, whichever comes first. Flume provides two serializers for HBase sink.
The impleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation.
The RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.
Below is the property table for HBase sink and required properties are in bold.
Property Name | Default | Description |
type | – | The component type name, needs to be hbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the propertyhbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value ofzookeeper.znode.parent in hbase-site.xml |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | FALSE | Should the sink coalesce multiple increments to a cell per batch. |
serializer | see description | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer. Default increment column = “iCol", payload column = “pCol" |
Steps
1- Prepare Flume Agent Configuration
Add the below configuration properties in SpoolAgent.conf file to create SpoolAgent with Spooling Directory source, spillable memory channel and HBase Sink. Create necessary folders on OS as per your configuration and grant the proper rights to the user who will be running flume agent.
2- Create Table in HBase
As I'm on HDP and want to use Flume on command prompt rather than Ambari, I need to set the required environment variables for HBase access.
[hbase@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[hbase@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
[hbase@dn04 ~]$ export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
Add the below configuration properties in SpoolAgent.conf file to create SpoolAgent with Spooling Directory source, spillable memory channel and HBase Sink. Create necessary folders on OS as per your configuration and grant the proper rights to the user who will be running flume agent.
SpoolAgent.conf
### SpoolAgent-Spooling Directory Source, Spillable Memory Channel and HBase Sink ###
##
# Name the components on this agent
SpoolAgent.sources = spooldir-source
SpoolAgent.channels = spillmem-channel
SpoolAgent.sinks = hbase-sink
# Describe/configure Source
SpoolAgent.sources.spooldir-source.type = spooldir
SpoolAgent.sources.spooldir-source.spoolDir = /data/flume/spooldir
SpoolAgent.sources.spooldir-source.fileHeader = false
# Describe the sink
SpoolAgent.sinks.hbase-sink.type = hbase
SpoolAgent.sinks.hbase-sink.table = spooled_table
SpoolAgent.sinks.hbase-sink.columnFamily = spool_cf
# Use a channel which buffers events in file
SpoolAgent.channels.spillmem-channel.type = SPILLABLEMEMORY
SpoolAgent.channels.spillmem-channel.memoryCapacity = 10000
SpoolAgent.channels.spillmem-channel.overflowCapacity = 1000000
SpoolAgent.channels.spillmem-channel.byteCapacity = 80000
SpoolAgent.channels.spillmem-channel.checkpointDir = /data/flume/checkpoint/
SpoolAgent.channels.spillmem-channel.dataDirs = /data/flume/data/
# Bind the source and sink to the channel
SpoolAgent.sources.spooldir-source.channels = spillmem-channel
SpoolAgent.sinks.hbase-sink.channel = spillmem-channel
2- Create Table in HBase
You need to create the table using HBase Shell as per agent configuration.
hbase(main):014:0> create 'spooled_table', 'spool_cf'
0 row(s) in 2.3170 seconds
=> Hbase::Table - spooled_table
3- Start Flume Agent
As I'm on HDP and want to use Flume on command prompt rather than Ambari, I need to set the required environment variables for HBase access.
[hbase@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[hbase@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
[hbase@dn04 ~]$ export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
[hbase@dn04 ~]$ flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/SpoolAgent.conf -Dflume.root.logger=DEBUG,console -n SpoolAgent
4- Verify the process
Now place some text files in spool directory , Flume agent will read the event and put into HBase table. After copying the each file event, agent will rename the file ending with .COMPLETED.
Check the data using HBase Shell also.
1 comment:
Very nice Blog with very unique content.
it helps me alot.
hadoop administrator certification
Post a Comment