Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Thursday, January 25, 2018

Custom Merge Utility for Flume Generated Files


Problem:

My client is streaming tweets to HDFS location, thousands of flume files are being created on this location. Hive External Table has been created on this location, external table performance is degraded when there are too may small files.


Pre-Requisites

NFS Gateway is running 

Configuring NFS Gateway for HDFS [HDP]

Solution

I wrote a small bash utility to client technical resource to further modify as per their requirements if any. Just sharing below if someone wants to take help from it.


##############################################################################
#!/bin/sh

search_dir=/data/hdfsloc/flume/twitter
merge_dir=/data/hdfsloc/flume/ora_ext_tables
#search_dir=/data/log
#merge_dir=/data/log

merge_utility_log_dir=/data/log
dt=`date +%y%m%d%H%M%S`
LOGFILE="$merge_utility_log_dir/bigdata_merge_$dt.log"

merge_file_name="AllTweetsP1.json"
msg="Merging Flume Twitter Small Files to <<\e[35m$merge_file_name\e[0m>>"


clear
echo "Inam's Big Data Merge Utility Ver. 1.0"
echo
echo "Inam's Big Data Merge Utility Ver. 1.0" >> $LOGFILE
echo "======================================" >> $LOGFILE
echo >> $LOGFILE
echo "Log Opened at   : $LOGFILE" >> $LOGFILE
echo "Log Opened at   : $LOGFILE"
echo "Log Time        : $(date)" >> $LOGFILE
echo "Log Time        : $(date)"
echo "Serach Path     : $search_dir" >> $LOGFILE
echo "Serach Path     : $search_dir"
echo "Merge Path      : $merge_dir" >> $LOGFILE
echo "Merge Path      : $merge_dir"
echo "Merged File     : $merge_file_name" >> $LOGFILE
echo "Merged File     : $merge_file_name"
echo >> $LOGFILE
echo
echo -e "                         \e[41m\e[97m W   A   R   N   I   N   G \e[0m                            "
echo -e "                         ***************************                            "

fcount=`ls -1 -I "*.tmp" -I "*.json" -I "bigdata_merge*" $search_dir | wc -l`
echo -e "There are \e[33m$fcount\e[0m files pending to be merged and removed in folder $search_dir"
echo "There are $fcount files pending to be merged and removed in folder $search_dir" >> $LOGFILE
echo "More files are being generated by Flume continuously even right now ... "
echo "More files are being generated by Flume continuously even right now ... " >> $LOGFILE
echo
echo "Do you really want to continue? Press (Y):"
read really_continue

if [ "$really_continue" == "Y" ]; then

time_start=$(date +%s)
echo >> $LOGFILE
echo >> $LOGFILE
echo
echo
echo -e $msg
echo "Merging Flume Twitter Small Files to <<$merge_file_name>>"  >> $LOGFILE
echo "***********************************************************************"
echo "***********************************************************************" >> $LOGFILE
echo >> $LOGFILE
echo

for f in `ls -I "*.tmp" -I "*.json" -I "bigdata_merge*" $search_dir`
do

cat $search_dir/$f >> $merge_dir/$merge_file_name
rm -f $search_dir/$f
echo -e "File <<\e[33m$f\e[0m>> \e[92mmerged\e[0m and \e[31mremoved\e[0m successfully"
echo "File <<$f>> merged and removed successfully" >> $LOGFILE
#sleep 0.1s
done

echo
echo "***********************************************************************"
echo "***********************************************************************" >> $LOGFILE
echo "Merged File Size is given below"
echo "Merged File Size is given below" >> $LOGFILE
echo `du -h $merge_dir/$merge_file_name`
echo `du -h $merge_dir/$merge_file_name` >> $LOGFILE
echo
echo "Activity Log generated at: $LOGFILE"
echo >> $LOGFILE
time_end=$(date +%s)
diff=$(($time_end - time_start))
echo "Time Taken               : $diff Seconds"
echo "Time Taken               : $diff Seconds" >> $LOGFILE
echo "***********************************************************************"
echo "***********************************************************************" >> $LOGFILE
echo
echo
else
echo "User Cancelled Operation"
echo "User Cancelled Operation" >> $LOGFILE
fi
##############################################################################

Test it

[root@en01 bash]# sh merge_n_delete.sh
Inam's Big Data Merge Utility Ver. 1.0

Log Opened at   : /data/log/bigdata_merge_180109164002.log
Log Time        : Tue Jan  9 16:40:02 AST 2018
Serach Path     : /data/hdfsloc/flume/twitter
Merge Path      : /data/hdfsloc/flume/twitter
Merged File     : AllTweetsP1.json

                          W   A   R   N   I   N   G
                         *******************
There are 124 files pending to be merged and removed in folder /data/hdfsloc/flume/twitter
More files are being generated by Flume continuously even right now ...

Do you really want to continue? Press (Y):
Y


Merging Flume Twitter Small Files to <<AllTweetsP1.json>>
********************************************************************
.....
.....
File <<FlumeData.1515504976833>> merged and removed successfully
File <<FlumeData.1515505014427>> merged and removed successfully
File <<FlumeData.1515505055678>> merged and removed successfully
File <<FlumeData.1515505089314>> merged and removed successfully
File <<FlumeData.1515505120942>> merged and removed successfully
File <<FlumeData.1515505165930>> merged and removed successfully

***********************************************************************
Merged File Size is given below
2.0G /data/hdfsloc/flume/twitter/AllTweetsP1.json

Activity Log generated at: /data/log/bigdata_merge_180109164002.log
Time Taken               : 16 Seconds
***********************************************************************

No comments: