Introduction
Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases. You can clean and transform your data during ingestion to gain near real-time insights immediately at index or output time. Logstash comes out-of-box with many aggregations and mutations along with pattern matching, geo mapping, and dynamic lookup capabilities.
Logstash provided Grok which is a great way to parse unstructured log data into something structured and queryable. This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format that is generally written for humans and not computer consumption.
The logstash agent processes pipeline with 3 stages: inputs → filters → outputs. Inputs generate events (having properties), filters modify them, outputs ship them elsewhere.
Installation
Logstash requires Java 8. Download the Logstash installation file that matches your host environment. Unpack the file and it is ready to work with.
Test the Installation
Test your Logstash installation by running the most basic Logstash pipeline. A Logstash pipeline has two required elements, input and output, and one optional element, filter. The input plugins consume data from a source, the filter plugins modify the data as you specify, and the output plugins write the data to a destination. You can set the OS environment variables for your installation.
export PATH=$PATH:$LS_HOME/bin
[hdpsysuser@hdpmaster bin]$ logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T11:15:04,818][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T11:15:04,943][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T11:15:07,951][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-04-23T11:15:12,524][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-23T11:15:14,563][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-23T11:15:22,118][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-04-23T11:15:22,580][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x23773bb4 run>"}
The stdin plugin is now waiting for input:
[2018-04-23T11:15:23,385][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
Hi
2018-04-23T11:15:45.166Z hdpmaster Hi
The -e flag enables you to specify a configuration directly from the command line.
After starting Logstash, wait until you see "Pipeline main started" and then enter "Hi"
Logstash adds timestamp and IP address information to the message. Exit Logstash by issuing a CTRL-D command in the shell where Logstash is running.
Logstash Pipeline
Before you create the Logstash pipeline, you’ll configure Filebeat to send log lines to Logstash. The Filebeat client , designed for reliability and low latency, is a lightweight, resource-friendly tool that collects logs from files on the server and forwards these logs to your Logstash instance for processing. In a typical use case, Filebeat runs on a separate machine from the machine running your Logstash instance. The default Logstash installation includes the Beats input plugin.
To install Filebeat on your data source machine, download the appropriate package from the Filebeat product page https://www.elastic.co/downloads/beats/filebeat and upack.
export FB_HOME=/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64
if you have rpm you can install as below
[hdpsysuser@hdpmaster apps]$ sudo yum localinstall filebeat-8.3.1-x86_64.rpm
[hdpsysuser@hdpmaster apps]$ systemctl status filebeat.service
[hdpsysuser@hdpmaster apps]$ sudo systemctl start filebeat.service
[hdpsysuser@hdpmaster apps]$ sudo vim /etc/filebeat/filebeat.yml
Configure Filebeat
After installing Filebeat, you need to configure it. Open the filebeat.yml file located in your
Filebeat installation directory, and replace the contents with the following lines.
Make sure paths points to the example Apache log file, logstash-tutorial.log which can be downloaded from https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz
filebeat.yml
filebeat.prospectors:
- type: log
paths:
- /usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset
output.logstash:
hosts: ["localhost:5044"]
At the data source machine, run Filebeat with the following command:
sudo ./filebeat -e -c filebeat.yml -d "publish"
[hdpsysuser@hdpmaster filebeat-6.2.4-linux-x86_64]$ ./filebeat -e -c filebeat.yml -d "publish"
2018-04-23T12:14:10.757Z INFO instance/beat.go:468 Home path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64] Config path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64] Data path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data] Logs path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/logs]
2018-04-23T12:14:10.758Z INFO instance/beat.go:475 Beat UUID: 97f293dd-9903-4644-8892-6bb4d9e8999b
2018-04-23T12:14:10.758Z INFO instance/beat.go:213 Setup Beat: filebeat; Version: 6.2.4
2018-04-23T12:14:10.761Z INFO pipeline/module.go:76 Beat name: hdpmaster
2018-04-23T12:14:10.764Z INFO [monitoring] log/log.go:97 Starting metrics logging every 30s
2018-04-23T12:14:10.764Z INFO instance/beat.go:301 filebeat start running.
2018-04-23T12:14:10.765Z INFO registrar/registrar.go:73 No registry file found under: /usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data/registry. Creating a new registry file.
2018-04-23T12:14:10.869Z INFO registrar/registrar.go:110 Loading registrar data from /usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data/registry
2018-04-23T12:14:10.870Z INFO registrar/registrar.go:121 States Loaded from registrar: 0
2018-04-23T12:14:10.870Z WARN beater/filebeat.go:261 Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2018-04-23T12:14:10.870Z INFO crawler/crawler.go:48 Loading Prospectors: 1
2018-04-23T12:14:10.871Z INFO log/prospector.go:111 Configured paths: [/path/to/file/logstash-tutorial.log]
2018-04-23T12:14:10.872Z INFO crawler/crawler.go:82 Loading and starting Prospectors completed. Enabled prospectors: 1
Filebeat will attempt to connect on port 5044. Until Logstash starts with an active Beats plugin, there won’t be any answer on that port, so any messages you see regarding failure to connect on that port are normal for now.
Configure Logstash for Filebeat Input
Create a Logstash configuration pipeline that uses the Beats input plugin to receive events from Beats.
The following text represents the skeleton of a configuration pipeline:
# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}
Configure your Logstash instance to use the Beats input plugin by adding the following lines to the input section of the first-pipeline.conf file:
vi /usr/hadoopsw/elk/logstash-6.2.3/config/first-pipeline.conf
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}
We’ll configure Logstash to write to Elasticsearch later. For now, we add the line to the output section so that the output is printed to stdout when you run Logstash:
Verify Configuration
Verify your configuration, run the following command:
logstash -f /usr/hadoopsw/elk/logstash-6.2.3/config/first-pipeline.conf --config.test_and_exit
[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/first-pipeline.conf --config.test_and_exit
Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T12:55:09,135][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T12:55:09,261][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T12:55:11,904][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2018-04-23T12:55:25,976][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
The --config.test_and_exit option parses your configuration file and reports any errors.
Start Logstash
If the configuration file passes the configuration test, start Logstash with the following command:
[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/first-pipeline.conf --config.reload.automatic
Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T13:02:20,437][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T13:02:20,473][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T13:02:21,322][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-04-23T13:02:22,715][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-23T13:02:25,156][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-23T13:02:39,277][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-04-23T13:02:40,391][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-04-23T13:02:40,609][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x35f0421 run>"}
[2018-04-23T13:02:40,853][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-04-23T13:02:41,071][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
The --config.reload.automatic option enables automatic config reloading so that you don’t have to stop and restart Logstash every time you modify the configuration file.
If our pipeline is working correctly, we should see a series of events like the following written to the console:
{
"source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
"offset" => 24464,
"beat" => {
"name" => "hdpmaster",
"version" => "6.2.4",
"hostname" => "hdpmaster"
},
"@timestamp" => 2018-04-23T13:34:17.165Z,
"host" => "hdpmaster",
"@version" => "1",
"prospector" => {
"type" => "log"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\""
}
Parsing Web Logs with the Grok Filter Plugin
Parse the log messages to create specific, named fields from the logs. The grok filter plugin is one of several plugins that are available by default in Logstash. The grok filter plugin enables you to parse the unstructured log data into something structured and queryable.
Because the grok filter plugin looks for patterns in the incoming log data, configuring the plugin requires you to make decisions about how to identify the patterns that are of interest to your use case. A representative line from the web server log sample looks like this:
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
The IP address at the beginning of the line is easy to identify, as is the timestamp in brackets. To parse the data, you can use the %{COMBINEDAPACHELOG} grok pattern, which structures lines from the Apache log using the following schema:
Information
|
Field Name
|
IP Address
| clientip |
User ID
| ident |
User Authentication
| auth |
timestamp
| timestamp |
HTTP Verb
| verb |
Request body
| request |
HTTP Version
| httpversion |
HTTP Status Code
| response |
Bytes served
| bytes |
Referrer URL
| referrer |
User agent
| agent |
Edit the first-pipeline.conf file and replace the entire filter section with the following text:
first-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
Save changes. Because we’ve enabled automatic config reloading, we don’t have to restart Logstash to pick up our changes. However, we do need to force Filebeat to read the log file from scratch. To do this, go to the terminal window where Filebeat is running and press Ctrl+C to shut down Filebeat. Then delete the Filebeat registry file. Since Filebeat stores the state of each file it harvests in the registry, deleting the registry file forces Filebeat to read all the files it’s harvesting from scratch. Next, restart Filebeat.
After Logstash applies the grok pattern, the events will have the following JSON representation:
{
"message" => ".1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-3 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"httpversion" => "1.1",
"agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"ident" => "-",
"beat" => {
"hostname" => "hdpmaster",
"name" => "hdpmaster",
"version" => "6.2.4"
},
"source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
"verb" => "GET",
"timestamp" => "04/Jan/2015:05:30:37 +0000",
"auth" => "-",
"referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
"@version" => "1",
"host" => "hdpmaster",
"offset" => 24672,
"response" => "200",
"prospector" => {
"type" => "log"
},
"request" => "/MyURL-3",
"bytes" => "4877",
"clientip" => "1.76.62",
"@timestamp" => 2018-04-23T14:05:20.967Z,
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
Notice that the event includes the original message, but the log message is also broken down into specific fields.
Enhancing Data with the Geoip Filter Plugin
In addition to parsing log data for better searches, filter plugins can derive supplementary information from existing data. As an example, the geoip plugin looks up IP addresses, derives geographic location information from the addresses, and adds that location information to the logs. Configure your Logstash instance to use the geoip filter plugin. The geoip plugin configuration requires you to specify the name of the source field that contains the IP address to look up.
Since filters are evaluated in sequence, make sure that the geoip section is after the grok section of the configuration file and that both the grok and geoip sections are nested within the filter section.
When we’re done, the contents of first-pipeline.conf should look like this:
first-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
Save changes. To force Filebeat to read the log file from scratch, shut down Filebeat (press Ctrl+C), delete the registry file, and then restart Filebeat.
Notice that the event now contains geographic location information:
{
"message" => "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-4 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"httpversion" => "1.1",
"agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"ident" => "-",
"beat" => {
"hostname" => "hdpmaster",
"name" => "hdpmaster",
"version" => "6.2.4"
},
"source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
"verb" => "GET",
"timestamp" => "04/Jan/2015:05:30:37 +0000",
"auth" => "-",
"referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
"@version" => "1",
"host" => "hdpmaster",
"offset" => 24885,
"geoip" => {
"region_name" => "Tokyo",
"city_name" => "Tokyo",
"country_name" => "Japan",
"location" => {
"lon" => 139.7514,
"lat" => 35.685
},
"region_code" => "13",
"postal_code" => "190-0031",
"country_code2" => "JP",
"longitude" => 139.7514,
"timezone" => "Asia/Tokyo",
"country_code3" => "JP",
"ip" => "1.76.0.62",
"continent_code" => "AS",
"latitude" => 35.685
},
"response" => "200",
"prospector" => {
"type" => "log"
},
"request" => "/MyURL-4",
"bytes" => "4877",
"clientip" => "1.76.62",
"@timestamp" => 2018-04-23T14:19:33.037Z,
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
Indexing Data into Elasticsearch
Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file and replace the entire output section.
first-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
Save changes. To force Filebeat to read the log file from scratch, shut down Filebeat (press Ctrl+C), delete the registry file, and then restart Filebeat.
Testing Pipeline
Now that the Logstash pipeline is configured to index the data into an Elasticsearch cluster, we can query Elasticsearch.
First take the list of all indices, notice the logstash-CURRENETDATE is created by logstash eg; logstash-2018.04.23
[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana 4YKQ6kwWSCyQHhM1Ks_ZIQ 1 0 23 3 90.3kb 90.3kb
yellow open logstash-2018.04.23 MELDbsEdQvSlDL8nXj-lyQ 5 1 1 0 23kb 23kb
Try a test query to Elasticsearch based on the fields created by the grok filter plugin.
curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'
curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=response=200'
[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=response=200'
{
"took" : 48,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 0.2876821,
"hits" : [
{
"_index" : "logstash-2018.04.23",
"_type" : "doc",
"_id" : "eOfu8mIBFjVpawNmVgj1",
"_score" : 0.2876821,
"_source" : {
"message" : "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-5 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"httpversion" : "1.1",
"agent" : "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"ident" : "-",
"beat" : {
"hostname" : "hdpmaster",
"name" : "hdpmaster",
"version" : "6.2.4"
},
"source" : "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
"verb" : "GET",
"timestamp" : "04/Jan/2015:05:30:37 +0000",
"auth" : "-",
"referrer" : "\"http://www.semicomplete.com/projects/xdotool/\"",
"@version" : "1",
"host" : "hdpmaster",
"offset" : 25098,
"geoip" : {
"region_name" : "Tokyo",
"city_name" : "Tokyo",
"country_name" : "Japan",
"location" : {
"lon" : 139.7514,
"lat" : 35.685
},
"region_code" : "13",
"postal_code" : "190-0031",
"country_code2" : "JP",
"longitude" : 139.7514,
"timezone" : "Asia/Tokyo",
"country_code3" : "JP",
"ip" : "1.76.0.62",
"continent_code" : "AS",
"latitude" : 35.685
},
"response" : "200",
"prospector" : {
"type" : "log"
},
"request" : "/MyURL-5",
"bytes" : "4877",
"clientip" : "1.76.62",
"@timestamp" : "2018-04-23T14:35:17.340Z",
"tags" : [
"beats_input_codec_plain_applied"
]
}
}
]
}
}
Try another search for the geographic information derived from the IP address.
[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=geoip.city_name=Tokyo'
{
"took" : 27,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 0.2876821,
"hits" : [
{
"_index" : "logstash-2018.04.23",
"_type" : "doc",
"_id" : "eOfu8mIBFjVpawNmVgj1",
"_score" : 0.2876821,
"_source" : {
"message" : "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-5 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"httpversion" : "1.1",
"agent" : "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"ident" : "-",
"beat" : {
"hostname" : "hdpmaster",
"name" : "hdpmaster",
"version" : "6.2.4"
},
"source" : "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
"verb" : "GET",
"timestamp" : "04/Jan/2015:05:30:37 +0000",
"auth" : "-",
"referrer" : "\"http://www.semicomplete.com/projects/xdotool/\"",
"@version" : "1",
"host" : "hdpmaster",
"offset" : 25098,
"geoip" : {
"region_name" : "Tokyo",
"city_name" : "Tokyo",
"country_name" : "Japan",
"location" : {
"lon" : 139.7514,
"lat" : 35.685
},
"region_code" : "13",
"postal_code" : "190-0031",
"country_code2" : "JP",
"longitude" : 139.7514,
"timezone" : "Asia/Tokyo",
"country_code3" : "JP",
"ip" : "1.76.0.62",
"continent_code" : "AS",
"latitude" : 35.685
},
"response" : "200",
"prospector" : {
"type" : "log"
},
"request" : "/MyURL-5",
"bytes" : "4877",
"clientip" : "1.76.62",
"@timestamp" : "2018-04-23T14:35:17.340Z",
"tags" : [
"beats_input_codec_plain_applied"
]
}
}
]
}
}
If you are using Kibana to visualize your data, you can also explore the Filebeat data in Kibana:
Stashing to HDFS
Webhdfs output plugin
This plugin sends Logstash events into files in HDFS via the webhdfs REST API. The HTTP REST API supports the complete FileSystem interface for HDFS.
vi /usr/hadoopsw/elk/logstash-6.2.3/config/logstash2hdfs.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
webhdfs {
host => "127.0.0.1" # (required)
port => 50070 # (optional, default: 50070)
path => "/user/logstash/%{+YYYY-MM-dd}logstash-%{+HH}.log" #(required) JodaFmt
user => "hdpsysuser" # (required)
}
}
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
Run logstash with new file
[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/logstash2hdfs.conf --config.reload.automatic
Create the related folder in HDFS and then add one line in the Filebeat monitored file eg; /usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset
You will see the new file generated in HDFS. Check its contents.
[hdpsysuser@hdpmaster ~]$ hdfs dfs -mkdir /user/logstash
[hdpsysuser@hdpmaster ~]$ hdfs dfs -ls /user/logstash
Found 1 items
-rwxr-xr-x 1 hdpsysuser supergroup 248 2018-04-23 17:03 /user/logstash/2018-04-23logstash-17.log
[hdpsysuser@hdpmaster ~]$ hdfs dfs -cat /user/logstash/2018-04-23logstash-17.log
2018-04-23T17:03:18.660Z hdpmaster .76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /MyURL-HDFS1 HTTP/1.1" 200 4877 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
Managing Multi Input and Output
You can configure logstash pipeline for multiple inputs and outputs. Sample configuration is provided below.
/data/elk/logstash-6.2.3/config/multifile.cfg
##Input Plugin
input {
file {
type => "technical"
path => "/data/elk/elkdata/tech.log"
}
file {
type => "business"
path => "/data/elk/elkdata/buss.log"
}
file {
type => "application"
path => "/data/elk/elkdata/app.log"
}
}
##input ends
##Filter Plugin
filter {
if [type] == "technical" {
# processing .......
}
if [type] == "business" {
# processing .......
}
if [type] == "application" {
# processing .......
}
}
###filter ends
###Output Plugin
output {
if [type] == "technical" {
webhdfs {
codec=>"json"
host => "nn01" # (required)
port => 50070 # (optional, default: 50070)
path => "/logstash/techlog/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt
user => "elk" # (required)
}
} ##technical
if [type] == "business" {
webhdfs {
host => "nn01" # (required)
port => 50070 # (optional, default: 50070)
path => "/logstash/busslog/%{+YYYY-MM-dd}-business-%{+HH}.log" #(required) JodaFmt
user => "elk" # (required)
}
} ##business
if [type] == "application" {
webhdfs {
host => "nn01" # (required)
port => 50070 # (optional, default: 50070)
path => "/logstash/applog/%{+YYYY-MM-dd}-application-%{+HH}.log" #(required) JodaFmt
user => "elk" # (required)
}
} ##application
}
##output ends
Managing Multi Input and Output with beats
Here is the sample configuration to manage multiple inputs and outputs with beats.
First configure the filebeat (filebeat.yml) for harvesting different log files on different locations then configure the logstash pipleline to process the beat.
/data/elk/filebeat-6.2.4-linux-x86_64/filebeat.yml
filebeat.prospectors:
- type: log
paths:
- /data/elk/elkdata/apache.log
fields:
logtype: apache
- type: log
paths:
- /data/elk/elkdata/jboss.log
fields:
logtype: jboss
output.logstash:
hosts: ["dn04:5044"]
/data/elk/logstash-6.2.3/config/multi-beats-pipeline.cfg
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
if [fields][logtype] == "apache" {
webhdfs {
host => "nn01" # (required)
port => 50070 # (optional, default: 50070)
path => "/logstash/apache/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt
user => "elk" # (required)
}
}
if [fields][logtype] == "jboss" {
webhdfs {
host => "nn01" # (required)
port => 50070 # (optional, default: 50070)
path => "/logstash/jboss/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt
# path => "/logstash/jboss/%{+YYYY}/%{+MM}/%{+YYYYMMdd}-technical-%{+HH}.log" #(required) JodaFmt
}
}
}
Logstash Configuration for different types of Logs
##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &
############## LS Input: Configuration for OS beats ################
input {
beats {
id => "linux"
port => "5044"
}##For Linux
beats {
id => "windows"
port => "6044"
type => "windows_events"
}##For Winlogbeat
}##input ends
############## LS Input: Configuration for OS beats ends ################
###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################
##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }
##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }
ruby {
code => "event.set('lc_received_at', Time.now());"
}
##Parsing syslog/messages file
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# add_field => [ "received_at", "%{@timestamp}" ]
# add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}##messages if
##Parsing syslog/audit file
if [source] == "/var/log/audit/audit.log"
{
##here processing for audit log
#"message" => "type=SERVICE_STOP msg=audit(1532417364.894:205361): pid=1 uid=0 auid=4294967295 ses=4294967295 msg='unit=xagt comm=\"systemd\" exe=\"/usr/lib/systemd/systemd\" hostname=? addr=? terminal=? res=success'",
kv{
allow_duplicate_values => "true"
recursive => "true"
}##kv
mutate {
rename => {
"type" => "audit_type"
"pid" => "process_id"
"uid" => "user_id"
"auid" => "audit_user_id"
"ses" => "session_id"
"comm" => "cmd_line_name"
"exe" => "cmd_line_path"
"addr" => "host_ip"
"terminal" => "terminal_name"
"res" => "result"
}
}##mutate
}##audit log if
if [source] == "/var/log/secure"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# add_field => [ "received_at", "%{@timestamp}" ]
# add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}##secure filter if ends
}##if logtype=syslog
############## LS Filter: Configuration for Linux Audit,Messages,Secure ends ################
############## LS Filter: Configuration for Windows Event Viewer ################
## if [@metadata][beat] == "winlogbeat"
## {
##
## mutate { add_field => [ "received_at", "%{@timestamp}" ] }
## mutate { add_field => [ "received_from", "%{host}" ] }
##
##
## }##winlogbeat if ends
##
############## LS Filter: Configuration for Windows Event Viewer ends ################
############## LS Filter: Configuration for IIS events Starts ################
##Parsing IIS file
if [fields][logtype] == "iis"
{
##ignore log comments
if [message] =~ "^#"
{
drop {}
}
###date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status
###sc-substatus sc-win32-status time-taken
###2018-10-22 00:00:45 ::1 POST /powershell clientApplication=ActiveMonitor;PSVersion=4.0&CorrelationID=<empty>;&cafeReqId=e89d03ed-f465-4df6-8c16-8ec78edf5420; 80 - ::1 Microsoft+WinRM+Client - 200 0 0 15
cs-username c-ip cs-version cs(User-Agent) cs(Cookie) cs(Referer) cs-host sc-status sc-substatus sc-win32-status sc-bytes cs-bytes time-taken Original-IP
grok {
match => { "message" => [
###MAS PROD IIS Pattern
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
#,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
### MAS STG IIS Pattern
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int}"
]
}
# add_field => [ "received_at", "%{@timestamp}" ]
}##grok ends
### original user ip details
geoip {
source => "originalip"
}##geoip ends
###Useragent details
useragent {
source => "cs_useragent"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "cs_uri_query"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] IIS if ends
############## LS Filter: Configuration for IIS events ends ################
############## LS Filter: Configuration for Apache Tomcat Catalina Access events Starts ################
##Parsing Apache Tomcat Catalina Access Log
if [fields][logtype] == "tomcat"
{
if [source] =~ "access" {
mutate { replace => { type => "catalina_access" } }
grok {
#"192.168.67.233--[12/Apr/2018:00:00:19+0300]POST/view/unit/unitRegistration.xhtmlHTTP/1.120033662"
# 10.0.0.7 - - [03/Sep/2017:10:58:19 +0000] "GET /pki/scep/pkiclient.exe?operation=GetCACaps&message= HTTP/1.1" 200 39
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
# %{NOTSPACE:csURIquery}
match => [ "message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] %{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol} %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
,"message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] \"%{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol}\" %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
]
}
} else if [source] =~ "error" {
mutate { replace => { type => "catalina_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
### original user ip details
geoip {
source => "remoteHost"
}##geoip ends
###Useragent details
useragent {
source => "targetURL"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "csURIquery"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] apache if ends
############## LS Filter: Configuration for Apache Tomcat Catalina Access events Ends ################
}##filter
###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }
############## LS Output: Configuration for Linux Audit,Messages,Secure ################
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
webhdfs
{
workers => 1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/messages/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-messages" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
# kafka
# {
#
# bootstrap_servers => "dn04:6667"
# codec => json
# compression_type => "snappy"
# topic_id => "messages"
# acks => "0"
# }
#
}##messages if ends
if [source] == "/var/log/audit/audit.log"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-audit" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
compression => "snappy"
}
kafka
{
bootstrap_servers => "dn04:6667"
codec => json
compression_type => "snappy"
topic_id => "audit"
acks => "0"
}
}##audit if ends
if [source] == "/var/log/secure"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
# kafka
# {
# bootstrap_servers => "dn04:6667"
# codec => json
# compression_type => "snappy"
# topic_id => "secure"
# acks => "0"
# }
#
}##secure if ends
#############APACHE ACCESS LOG #####################
if [source] == "/var/log/apache"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##apache if ends
}##[fields][logtype] if ends
############## LS Output: Linux Configuration ends ################
############## LS Output: Configuration for Windows Event Viewer Starts ################
if [@metadata][beat] == "winlogbeat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/ev/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-ev" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event viewer if ends
############## LS Output: Windows event viewer Configuration ends ################
############## LS Output: Configuration for IIS Starts ################
if [fields][logtype] == "iis"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/iis/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-iis" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event IIS if ends
############## LS Output: Configuration for IIS Ends ################
############## LS Output: Configuration for Apache Tomcat Catalina Starts ################
if [fields][logtype] == "tomcat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/tomcat/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-accesslog" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##Tomcat Catalina if ends
############## LS Output: Configuration for Apache Tomcat Catalina Ends ################
}##output ends
Multi Pipelines
If you need to run more than one pipeline in the same process, Logstash provides a way to do this through a configuration file called pipelines.yml. This file must be placed in the path.settingsfolder
pipelines.yml
##LINUX pipeline for Audit,Secure,Messages.Tomcat Catalina,
- pipeline.id: linux-pipeline
path.config: "/usr/hadoopsw/elk/logstash-6.3.1/config/linux-pipeline.cfg"
## WINDOWS pipeline for Windows Event Viewer and IIS
- pipeline.id: windows-pipeline
path.config: "/usr/hadoopsw/elk/logstash-6.3.1/config/windows-pipeline.cfg"
This file is formatted in YAML and contains a list of dictionaries, where each dictionary describes a pipeline, and each key/value pair specifies a setting for that pipeline.
linux-pipeline.cfg
##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &
############## LS Input: Configuration for OS beats ################
input {
beats {
id => "linux"
port => "5044"
}##For Linux
}##input ends
############## LS Input: Configuration for OS beats ends ################
###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################
##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }
##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }
ruby {
code => "event.set('lc_received_at', Time.now());"
}
##Parsing syslog/messages file
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}##messages if
##Parsing syslog/audit file
if [source] == "/var/log/audit/audit.log"
{
##here processing for audit log
#"message" => "type=SERVICE_STOP msg=audit(1532417364.894:205361): pid=1 uid=0 auid=4294967295 ses=4294967295 msg='unit=xagt comm=\"systemd\" exe=\"/usr/lib/systemd/systemd\" hostname=? addr=? terminal=? res=success'",
kv{
allow_duplicate_values => "true"
recursive => "true"
}##kv
mutate {
rename => {
"type" => "audit_type"
"pid" => "process_id"
"uid" => "user_id"
"auid" => "audit_user_id"
"ses" => "session_id"
"comm" => "cmd_line_name"
"exe" => "cmd_line_path"
"addr" => "host_ip"
"terminal" => "terminal_name"
"res" => "result"
}
}##mutate
}##audit log if
if [source] == "/var/log/secure"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# add_field => [ "received_at", "%{@timestamp}" ]
# add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
# add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}##secure filter if ends
}##if logtype=syslog
############## LS Filter: Configuration for Linux Audit,Messages,Secure ends ################
############## LS Filter: Configuration for Apache Tomcat Catalina Access events Starts ################
##Parsing Apache Tomcat Catalina Access Log
if [fields][logtype] == "tomcat"
{
if [source] =~ "access" {
mutate { replace => { type => "catalina_access" } }
grok {
#"192.168.67.233--[12/Apr/2018:00:00:19+0300]POST/view/unit/unitRegistration.xhtmlHTTP/1.120033662"
# 10.0.0.7 - - [03/Sep/2017:10:58:19 +0000] "GET /pki/scep/pkiclient.exe?operation=GetCACaps&message= HTTP/1.1" 200 39
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
# %{NOTSPACE:csURIquery}
match => [ "message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] %{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol} %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
,"message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] \"%{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol}\" %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
]
}
} else if [source] =~ "error" {
mutate { replace => { type => "catalina_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
### original user ip details
geoip {
source => "remoteHost"
}##geoip ends
###Useragent details
useragent {
source => "targetURL"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "csURIquery"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] apache if ends
}##if logtype=syslog
############## LS Filter: Configuration for Linux Audit,Messages,Secure ends ################
############## LS Filter: Configuration for Apache Tomcat Catalina Access events Starts ################
##Parsing Apache Tomcat Catalina Access Log
if [fields][logtype] == "tomcat"
{
if [source] =~ "access" {
mutate { replace => { type => "catalina_access" } }
grok {
#"192.168.67.233--[12/Apr/2018:00:00:19+0300]POST/view/unit/unitRegistration.xhtmlHTTP/1.120033662"
# 10.0.0.7 - - [03/Sep/2017:10:58:19 +0000] "GET /pki/scep/pkiclient.exe?operation=GetCACaps&message= HTTP/1.1" 200 39
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
# %{NOTSPACE:csURIquery}
match => [ "message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] %{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol} %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
,"message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] \"%{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol}\" %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
]
}
} else if [source] =~ "error" {
mutate { replace => { type => "catalina_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
### original user ip details
geoip {
source => "remoteHost"
}##geoip ends
###Useragent details
useragent {
source => "targetURL"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "csURIquery"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] apache if ends
############## LS Filter: Configuration for Apache Tomcat Catalina Access events Ends ################
}##filter
###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }
}##filter
###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }
############## LS Output: Configuration for Linux Audit,Messages,Secure ################
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
webhdfs
{
workers => 1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/messages/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-messages" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##messages if ends
if [source] == "/var/log/audit/audit.log"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-audit" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
#compression => "snappy"
}
}##audit if ends
if [source] == "/var/log/secure"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##secure if ends
}##[fields][logtype] == "syslog" if ends
############## LS Output: Linux Syslog Configuration ends ################
############## LS Output: Configuration for Apache Tomcat Catalina Starts ################
if [fields][logtype] == "tomcat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/tomcat/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-accesslog" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##Tomcat Catalina if ends
############## LS Output: Configuration for Apache Tomcat Catalina Ends ################
}##output ends
windows-pipeline.cfg
##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &
############## LS Input: Configuration for OS beats ################
input {
beats {
id => "windows"
port => "6044"
type => "windows_events"
}##For Winlogbeat
}##input ends
############## LS Input: Configuration for OS beats ends ################
###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################
##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }
##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }
ruby {
code => "event.set('lc_received_at', Time.now());"
}
############## LS Filter: Configuration for Windows Event Viewer ################
## if [@metadata][beat] == "winlogbeat"
## {
##
## mutate { add_field => [ "received_at", "%{@timestamp}" ] }
## mutate { add_field => [ "received_from", "%{host}" ] }
##
##
## }##winlogbeat if ends
##
############## LS Filter: Configuration for Windows Event Viewer ends ################
############## LS Filter: Configuration for IIS events Starts ################
##Parsing IIS file
if [fields][logtype] == "iis"
{
##ignore log comments
if [message] =~ "^#"
{
drop {}
}
###date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status
###sc-substatus sc-win32-status time-taken
###2018-10-22 00:00:45 ::1 POST /powershell clientApplication=ActiveMonitor;PSVersion=4.0&CorrelationID=<empty>;&cafeReqId=e89d03ed-f465-4df6-8c16-8ec78edf5420; 80 - ::1 Microsoft+WinRM+Client - 200 0 0 15
### date time s-sitename s-computername s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs-version cs(User-Agent) cs(Cookie) cs(Referer) cs-host sc-status sc-substatus sc-win32-status sc-bytes cs-bytes time-taken Original-IP
grok {
match => { "message" => [
###MAS PROD IIS Pattern
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
#,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
### MAS STG IIS Pattern
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int}"
]
}
# add_field => [ "received_at", "%{@timestamp}" ]
}##grok ends
### original user ip details
geoip {
source => "originalip"
}##geoip ends
###Useragent details
useragent {
source => "cs_useragent"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "cs_uri_query"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] IIS if ends
############## LS Filter: Configuration for IIS events ends ################
}##filter
###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }
############## LS Output: Configuration for Windows Event Viewer Starts ################
if [@metadata][beat] == "winlogbeat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/ev/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-ev" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event viewer if ends
############## LS Output: Windows event viewer Configuration ends ################
############## LS Output: Configuration for IIS Starts ################
if [fields][logtype] == "iis"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/iis/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-iis" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event IIS if ends
############## LS Output: Configuration for IIS Ends ################
}##output ends
Output for Kafka
output {
kafka {
bootstrap_servers => "x.x.44.135:9092"
codec => json
topic_id => "topic_all_logs"
}
}
Run the pipelines
[hdpsysuser@dn01 ~]$ logstash --config.reload.automatic
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
webhdfs
{
workers => 1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/messages/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-messages" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##messages if ends
if [source] == "/var/log/audit/audit.log"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-audit" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
#compression => "snappy"
}
}##audit if ends
if [source] == "/var/log/secure"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##secure if ends
}##[fields][logtype] == "syslog" if ends
############## LS Output: Linux Syslog Configuration ends ################
############## LS Output: Configuration for Apache Tomcat Catalina Starts ################
if [fields][logtype] == "tomcat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/tomcat/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-accesslog" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##Tomcat Catalina if ends
############## LS Output: Configuration for Apache Tomcat Catalina Ends ################
}##output ends
windows-pipeline.cfg
##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &
############## LS Input: Configuration for OS beats ################
input {
beats {
id => "windows"
port => "6044"
type => "windows_events"
}##For Winlogbeat
}##input ends
############## LS Input: Configuration for OS beats ends ################
###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################
##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }
##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }
ruby {
code => "event.set('lc_received_at', Time.now());"
}
############## LS Filter: Configuration for Windows Event Viewer ################
## if [@metadata][beat] == "winlogbeat"
## {
##
## mutate { add_field => [ "received_at", "%{@timestamp}" ] }
## mutate { add_field => [ "received_from", "%{host}" ] }
##
##
## }##winlogbeat if ends
##
############## LS Filter: Configuration for Windows Event Viewer ends ################
############## LS Filter: Configuration for IIS events Starts ################
##Parsing IIS file
if [fields][logtype] == "iis"
{
##ignore log comments
if [message] =~ "^#"
{
drop {}
}
###date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status
###sc-substatus sc-win32-status time-taken
###2018-10-22 00:00:45 ::1 POST /powershell clientApplication=ActiveMonitor;PSVersion=4.0&CorrelationID=<empty>;&cafeReqId=e89d03ed-f465-4df6-8c16-8ec78edf5420; 80 - ::1 Microsoft+WinRM+Client - 200 0 0 15
### date time s-sitename s-computername s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs-version cs(User-Agent) cs(Cookie) cs(Referer) cs-host sc-status sc-substatus sc-win32-status sc-bytes cs-bytes time-taken Original-IP
grok {
match => { "message" => [
###MAS PROD IIS Pattern
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
#,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
,
### MAS STG IIS Pattern
"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int}"
]
}
# add_field => [ "received_at", "%{@timestamp}" ]
}##grok ends
### original user ip details
geoip {
source => "originalip"
}##geoip ends
###Useragent details
useragent {
source => "cs_useragent"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "cs_uri_query"
field_split => "&"
prefix => "qs_"
}##kv ends
}##[fields][logtype] IIS if ends
############## LS Filter: Configuration for IIS events ends ################
}##filter
###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }
############## LS Output: Configuration for Windows Event Viewer Starts ################
if [@metadata][beat] == "winlogbeat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/ev/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-ev" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event viewer if ends
############## LS Output: Windows event viewer Configuration ends ################
############## LS Output: Configuration for IIS Starts ################
if [fields][logtype] == "iis"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/win/iis/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-iis" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs
}##windows event IIS if ends
############## LS Output: Configuration for IIS Ends ################
}##output ends
Output for Kafka
output {
kafka {
bootstrap_servers => "x.x.44.135:9092"
codec => json
topic_id => "topic_all_logs"
}
}
Run the pipelines
[hdpsysuser@dn01 ~]$ logstash --config.reload.automatic
1 comment:
What a Beautiful post! This is so chock full of useful information I can't wait to dig and start using my time on blogging. I am just over six month into blogging 123movies
Post a Comment