Scenario
My company asked me to provide the solution for syslog aggregation for all the environments so that they may be able to analyze and get insights. Logs should be captured first, then retained and finally processed by the analyst team in a way they already use to query/process with database. The requirements are not much clearer as well as volume of data can't be determined at the stage.
Solution
As the volume of data and the lack of clear requirements make using a data warehouse not the best fit, naturally we came to Hadoop HDFS for storage using Hive and Presto (environment already setup in the company). Although we have Hive on TEZ for processing but we opted Presto for even better results. Below is the high level architecture for the solution. Flume agent will be used to receive the syslog events from the rsyslog client machines and will be stored in HDFS location. A hive external partitioned table will be created based on this HDFS location. Further a Presto view will be created based on this partitioned table and queried as per the analyst requirement.
Assumptions
1- Flume agent is already configured and running under HDP, for details please see my other posts as below.
Streaming Twitter Data using Apache Flume |
Installing/Configuring Hortonworks Data Platform [HDP] |
Below Flume agent configuration were used in HDP
# Naming the components on the current agent.
Logagent2.sources=SourceSyslog2
Logagent2.channels=ChannelMem2
Logagent2.sinks=SinkHDFS2
# Describing/Configuring the source
#agent.sources.SourceSyslog.type=syslogudp
Logagent2.sources.SourceSyslog2.type=syslogtcp
Logagent2.sources.SourceSyslog2.host=0.0.0.0
Logagent2.sources.SourceSyslog2.port=12346
Logagent2.sources.SourceSyslog2.keepFields=true
# Describing/Configuring the channel
Logagent2.channels.ChannelMem2.type=memory
Logagent2.channels.ChannelMem2.capacity = 1000000
Logagent2.channels.ChannelMem2.transactionCapacity = 100000
# Describing/Configuring the sink
Logagent2.sinks.SinkHDFS2.type=hdfs
Logagent2.sinks.SinkHDFS2.hdfs.path = /data/flume/syslogs2/%Y/%m/
Logagent2.sinks.SinkHDFS2.hdfs.fileType = DataStream
Logagent2.sinks.SinkHDFS2.hdfs.writeFormat = Text
Logagent2.sinks.SinkHDFS2.hdfs.batchSize = 1000
Logagent2.sinks.SinkHDFS2.hdfs.rollSize = 0
Logagent2.sinks.SinkHDFS2.hdfs.rollCount = 10000
Logagent2.sinks.SinkHDFS2.hdfs.filePrefix = syslog
# Binding the source and sink to the channel
Logagent2.sources.SourceSyslog2.channels = ChannelMem2
Logagent2.sinks.SinkHDFS2.channel = ChannelMem2
You need to update the /etc/rsyslog.conf (for Flume Agent host and Port) and restart rsyslog daemon eg; service rsyslog restart on your rsyslog client machines
2- Hive is already configured and running, for details please see my other post as below.
3- Presto Cluster is already configured and running, for details please see my other posts as below.
Sample Data
Below is the sample syslog data
<13>Jul 27 16:32:06 en01 root: Test-3 from Edge Node
<30>Jul 27 16:35:27 en01 systemd: Starting Cleanup of Temporary Directories...
<30>Jul 27 16:35:27 en01 systemd: Started Cleanup of Temporary Directories.
<30>Jul 27 16:40:01 en01 systemd: Started Session 4665 of user root.
<30>Jul 27 16:40:01 en01 systemd: Starting Session 4665 of user root.
<30>Aug 2 16:01:01 en01 systemd: Starting Session 5754 of user root.
Observe two spaces in Aug 2 record, it will have affect on your parsing.
Structure for HDFS
After reviewing the syslog data, following structure was proposed to store the files generated by Flume agent.
Month = Jul
Day = 16
Time = 16:32:06
Node = en01
Process = root: OR systemd
Log msg = Test-3 from Edge Node
Directory structure
data
flume
syslogs2
year month
<<messages here>>
Create the folder in HDFS to have external table's data
[hdfs@nn01 ~]$ hdfs dfs -mkdir /data/flume/syslogs2
Create external table where logs are stored
CREATE EXTERNAL TABLE flume.syslogs2(
logline STRING
) PARTITIONED BY(year int, month int)
stored as textfile;
You can use regular expression also for your view query. Presto uses java pattern syntax.
create or replace view presto_vw_syslogs_regex as
select year,month
,regexp_extract(logline, '\d+') Code
,regexp_extract(logline, '[a-zA-Z]+') Month_eng
,regexp_extract(logline, '\p{Blank}\d+') Day
,regexp_extract(logline, '\d+:\d+:\d+') Time
,regexp_extract(logline, '\w+-\w+-\w+-\w+') Host
,replace(regexp_extract(logline, '\D+\[|\D+:'),'[','') Process
,replace(regexp_extract(logline, '\d+]'),']','') ProcessID
,trim(replace(substr(logline,strpos(logline,regexp_extract(logline, ':\p{Blank}')) ),':','')) Event
,logline from flume.text_syslogs_current
The same can be done for Hive also , example given below.
create or replace view flume.hive_vw_syslogs_current_regex as
select year,month
,regexp_extract(logline, '[0-9]+',0) Code
,regexp_extract(logline, '[a-zA-Z]+',0) Month_eng
,regexp_extract(logline, '\\s\\d\\d',0) Day
,regexp_extract(logline, '\\d*:\\d*:\\d*',0) Time
,regexp_extract(logline, '\\w{3}-\\w{3}-\\w{2}-\\w{4}',0) Host
,translate(regexp_extract(logline, '[a-zA-Z]+\\[|[a-zA-Z]+:',0) ,'[','')Process
,translate(regexp_extract(logline, '\\d*]',0),']','') ProcessID
--,regexp_extract(logline, ':\\s',0) Event
,translate( substring(logline,instr(logline,regexp_extract(logline, ':\\s',0) ) ),': ','')Event
--,logline
from flume.text_syslogs_current t
2- Hive is already configured and running, for details please see my other post as below.
Hive Installation and Configuration |
3- Presto Cluster is already configured and running, for details please see my other posts as below.
Building Teradata Presto Cluster |
Sample Data
Below is the sample syslog data
<13>Jul 27 16:32:06 en01 root: Test-3 from Edge Node
<30>Jul 27 16:35:27 en01 systemd: Starting Cleanup of Temporary Directories...
<30>Jul 27 16:35:27 en01 systemd: Started Cleanup of Temporary Directories.
<30>Jul 27 16:40:01 en01 systemd: Started Session 4665 of user root.
<30>Jul 27 16:40:01 en01 systemd: Starting Session 4665 of user root.
<30>Aug 2 16:01:01 en01 systemd: Starting Session 5754 of user root.
Observe two spaces in Aug 2 record, it will have affect on your parsing.
Structure for HDFS
After reviewing the syslog data, following structure was proposed to store the files generated by Flume agent.
Month = Jul
Day = 16
Time = 16:32:06
Node = en01
Process = root: OR systemd
Log msg = Test-3 from Edge Node
Directory structure
data
flume
syslogs2
year month
<<messages here>>
Create the folder in HDFS to have external table's data
[hdfs@nn01 ~]$ hdfs dfs -mkdir /data/flume/syslogs2
CREATE EXTERNAL TABLE flume.syslogs2(
logline STRING
) PARTITIONED BY(year int, month int)
stored as textfile;
hive> CREATE EXTERNAL TABLE flume.syslogs2(
> logline STRING
> ) PARTITIONED BY(year int, month int)
> stored as textfile;
OK
Time taken: 0.158 seconds
Create the partition in Hive
Alter table flume.syslogs2 Add IF NOT EXISTS partition(year=2017, month=08)
location '/data/flume/syslogs2/2017/08';
Alter table flume.syslogs2 Add IF NOT EXISTS partition(year=2017, month=09)
location '/data/flume/syslogs2/2017/09';
hive> SHOW PARTITIONS syslogs2;
OK
year=2017/month=8
Time taken: 0.172 seconds, Fetched: 1 row(s)
Alter table flume.syslogs2 drop partition (year=2017,month=09)
Create view in Presto parsing the syslog message/event line
create or replace view presto_vw_syslogs2 as
select year,month,
substr(logline,2,2) Code
,substr(logline,5,3) Mnth
,cast(trim(substr(logline,9,2)) as integer) Day
,substr(logline,12,8) Time
,split(substr(logline,21),' ')[1] Host
,split(replace(split(substr(logline,21),' ')[2],':'),'[')[1] Process
,replace(split_part(replace(split(substr(logline,21),' ')[2],':'),'[',2),']') ProcessID
,split(ltrim(substr(logline,8)),' ',6)[6] Event
from flume.syslogs2
presto:flume> create or replace view presto_vw_syslogs2 as
-> select year,month,
-> substr(logline,2,2) Code
-> ,substr(logline,5,3) Mnth
-> ,cast(trim(substr(logline,9,2)) as integer) Day
-> ,substr(logline,12,8) Time
-> ,split(substr(logline,21),' ')[1] Host
-> ,split(replace(split(substr(logline,21),' ')[2],':'),'[')[1] Process
-> ,replace(split_part(replace(split(substr(logline,21),' ')[2],':'),'[',2),']') ProcessID
-> ,split(ltrim(substr(logline,8)),' ',6)[6] Event
-> from flume.syslogs2;
CREATE VIEW
presto:flume> select year,month,count(*) from presto_vw_syslogs2 group by year,month;
year | month | _col2
------+-------+-------
2017 | 8 | 16
(1 row)
You can use regular expression also for your view query. Presto uses java pattern syntax.
create or replace view presto_vw_syslogs_regex as
select year,month
,regexp_extract(logline, '\d+') Code
,regexp_extract(logline, '[a-zA-Z]+') Month_eng
,regexp_extract(logline, '\p{Blank}\d+') Day
,regexp_extract(logline, '\d+:\d+:\d+') Time
,regexp_extract(logline, '\w+-\w+-\w+-\w+') Host
,replace(regexp_extract(logline, '\D+\[|\D+:'),'[','') Process
,replace(regexp_extract(logline, '\d+]'),']','') ProcessID
,trim(replace(substr(logline,strpos(logline,regexp_extract(logline, ':\p{Blank}')) ),':','')) Event
,logline from flume.text_syslogs_current
The same can be done for Hive also , example given below.
create or replace view flume.hive_vw_syslogs_current_regex as
select year,month
,regexp_extract(logline, '[0-9]+',0) Code
,regexp_extract(logline, '[a-zA-Z]+',0) Month_eng
,regexp_extract(logline, '\\s\\d\\d',0) Day
,regexp_extract(logline, '\\d*:\\d*:\\d*',0) Time
,regexp_extract(logline, '\\w{3}-\\w{3}-\\w{2}-\\w{4}',0) Host
,translate(regexp_extract(logline, '[a-zA-Z]+\\[|[a-zA-Z]+:',0) ,'[','')Process
,translate(regexp_extract(logline, '\\d*]',0),']','') ProcessID
--,regexp_extract(logline, ':\\s',0) Event
,translate( substring(logline,instr(logline,regexp_extract(logline, ':\\s',0) ) ),': ','')Event
--,logline
from flume.text_syslogs_current t
Test few more queries
-- Year and Month specific result
SELECT * FROM flume.presto_vw_syslogs2
where year=2017 and month=08
order by time desc
-- Last 200 occurrences
SELECT * FROM flume.presto_vw_syslogs2
where year=2017 and month=08 and day = 7
order by time desc
limit 200
-- Group by Process
select process,count(1) tot_events
from flume.presto_vw_syslogs2 l
group by Process
-- Group by Code
select code,count(1) tot_events from flume.presto_vw_syslogs2 l
group by code
order by tot_events desc
-- By Date
select * from flume.presto_vw_syslogs2 l
where year = year(current_date) and month = month(current_date) and day =day(current_date)-1
--By Event
select event,count(1)
from flume.presto_vw_syslogs2 l
group by event
We used Presto functions for parsing, small description is given for reference.
split(string, delimiter, limit) → array<varchar>
Splits string on delimiter and returns an array of size at most limit. The last element in the array always contain everything left in the string. limit must be a positive number.
split_part(string, delimiter, index) → varchar
Splits string on delimiter and returns the field index. Field indexes start with 1. If the index is larger than than the number of fields, then null is returned.
replace(string, search, replace) → varchar
Replaces all instances of search with replace in string.
substr(string, start) → varchar
Returns the rest of string from the starting position start. Positions start with 1. A negative starting position is interpreted as being relative to the end of the string.
split_part(string, delimiter, index) → varchar
Splits string on delimiter and returns the field index. Field indexes start with 1. If the index is larger than than the number of fields, then null is returned.
-
replace(string, search) → varchar
Removes all instances of search from string.
replace(string, search, replace) → varchar
Replaces all instances of search with replace in string.
substr(string, start) → varchar
Returns the rest of string from the starting position start. Positions start with 1. A negative starting position is interpreted as being relative to the end of the string.
substr(string, start, length) → varcharReturns a substring from string of length length from the starting position start. Positions start with 1. A negative starting position is interpreted as being relative to the end of the string.
No comments:
Post a Comment