Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Tuesday, April 24, 2018

Configuring Logstash with Elasticsearch


Introduction



Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases. You can clean and transform your data during ingestion to gain near real-time insights immediately at index or output time. Logstash comes out-of-box with many aggregations and mutations along with pattern matching, geo mapping, and dynamic lookup capabilities.

Logstash provided Grok which is a great way to parse unstructured log data into something structured and queryable. This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format that is generally written for humans and not computer consumption.

The logstash agent processes pipeline with 3 stages: inputs → filters → outputs. Inputs generate events (having properties), filters modify them, outputs ship them elsewhere.

Installation

Logstash requires Java 8. Download the Logstash installation file that matches your host environment. Unpack the file and it is ready to work with. 

Test the Installation


Test your Logstash installation by running the most basic Logstash pipeline. A Logstash pipeline has two required elements, input and output, and one optional element, filter. The input plugins consume data from a source, the filter plugins modify the data as you specify, and the output plugins write the data to a destination. You can set the OS environment variables for your installation.


export LS_HOME=/usr/hadoopsw/elk/logstash-6.2.3
export PATH=$PATH:$LS_HOME/bin

[hdpsysuser@hdpmaster bin]$ logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T11:15:04,818][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T11:15:04,943][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T11:15:07,951][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-04-23T11:15:12,524][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-23T11:15:14,563][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-23T11:15:22,118][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-04-23T11:15:22,580][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x23773bb4 run>"}
The stdin plugin is now waiting for input:
[2018-04-23T11:15:23,385][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
Hi
2018-04-23T11:15:45.166Z hdpmaster Hi



The -e flag enables you to specify a configuration directly from the command line. 



After starting Logstash, wait until you see "Pipeline main started" and then enter "Hi"



Logstash adds timestamp and IP address information to the message. Exit Logstash by issuing a CTRL-D command in the shell where Logstash is running.




Logstash Pipeline

Before you create the Logstash pipeline, you’ll configure Filebeat to send log lines to Logstash. The Filebeat client , designed for reliability and low latency, is a lightweight, resource-friendly tool that collects logs from files on the server and forwards these logs to your Logstash instance for processing. In a typical use case, Filebeat runs on a separate machine from the machine running your Logstash instance. The default Logstash installation includes the Beats input plugin. 

To install Filebeat on your data source machine, download the appropriate package from the Filebeat product page https://www.elastic.co/downloads/beats/filebeat and upack.

export FB_HOME=/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64

if you have rpm you can install as below

[hdpsysuser@hdpmaster apps]$ sudo yum localinstall filebeat-8.3.1-x86_64.rpm
[hdpsysuser@hdpmaster apps]$ systemctl status filebeat.service
[hdpsysuser@hdpmaster apps]$ sudo systemctl start filebeat.service
[hdpsysuser@hdpmaster apps]$ sudo vim /etc/filebeat/filebeat.yml


Configure Filebeat

After installing Filebeat, you need to configure it. Open the filebeat.yml file located in your 
Filebeat installation directory, and replace the contents with the following lines. 

Make sure paths points to the example Apache log file, logstash-tutorial.log which can be downloaded from https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz

filebeat.yml

filebeat.prospectors:
- type: log
  paths:
    - /usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset 
output.logstash:
  hosts: ["localhost:5044"]


At the data source machine, run Filebeat with the following command:

sudo ./filebeat -e -c filebeat.yml -d "publish"

[hdpsysuser@hdpmaster filebeat-6.2.4-linux-x86_64]$ ./filebeat -e -c filebeat.yml -d "publish"

2018-04-23T12:14:10.757Z        INFO    instance/beat.go:468    Home path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64] Config path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64] Data path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data] Logs path: [/usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/logs]
2018-04-23T12:14:10.758Z        INFO    instance/beat.go:475    Beat UUID: 97f293dd-9903-4644-8892-6bb4d9e8999b
2018-04-23T12:14:10.758Z        INFO    instance/beat.go:213    Setup Beat: filebeat; Version: 6.2.4
2018-04-23T12:14:10.761Z        INFO    pipeline/module.go:76   Beat name: hdpmaster
2018-04-23T12:14:10.764Z        INFO    [monitoring]    log/log.go:97   Starting metrics logging every 30s
2018-04-23T12:14:10.764Z        INFO    instance/beat.go:301    filebeat start running.
2018-04-23T12:14:10.765Z        INFO    registrar/registrar.go:73       No registry file found under: /usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data/registry. Creating a new registry file.
2018-04-23T12:14:10.869Z        INFO    registrar/registrar.go:110      Loading registrar data from /usr/hadoopsw/elk/filebeat-6.2.4-linux-x86_64/data/registry
2018-04-23T12:14:10.870Z        INFO    registrar/registrar.go:121      States Loaded from registrar: 0
2018-04-23T12:14:10.870Z        WARN    beater/filebeat.go:261  Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2018-04-23T12:14:10.870Z        INFO    crawler/crawler.go:48   Loading Prospectors: 1
2018-04-23T12:14:10.871Z        INFO    log/prospector.go:111   Configured paths: [/path/to/file/logstash-tutorial.log]
2018-04-23T12:14:10.872Z        INFO    crawler/crawler.go:82   Loading and starting Prospectors completed. Enabled prospectors: 1


Filebeat will attempt to connect on port 5044. Until Logstash starts with an active Beats plugin, there won’t be any answer on that port, so any messages you see regarding failure to connect on that port are normal for now.

Configure Logstash for Filebeat Input

Create a Logstash configuration pipeline that uses the Beats input plugin to receive events from Beats.

The following text represents the skeleton of a configuration pipeline:

# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}

Configure your Logstash instance to use the Beats input plugin by adding the following lines to the input section of the first-pipeline.conf file:

vi /usr/hadoopsw/elk/logstash-6.2.3/config/first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
    stdout { codec => rubydebug }
}

We’ll configure Logstash to write to Elasticsearch later. For now, we add the line to the output section so that the output is printed to stdout when you run Logstash:

Verify Configuration

Verify your configuration, run the following command:

logstash -f /usr/hadoopsw/elk/logstash-6.2.3/config/first-pipeline.conf --config.test_and_exit


[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/first-pipeline.conf --config.test_and_exit
Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T12:55:09,135][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T12:55:09,261][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T12:55:11,904][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2018-04-23T12:55:25,976][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash


The --config.test_and_exit option parses your configuration file and reports any errors.

Start Logstash

If the configuration file passes the configuration test, start Logstash with the following command:

[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/first-pipeline.conf --config.reload.automatic

Sending Logstash's logs to /usr/hadoopsw/elk/logstash-6.2.3/logs which is now configured via log4j2.properties
[2018-04-23T13:02:20,437][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/fb_apache/configuration"}
[2018-04-23T13:02:20,473][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/hadoopsw/elk/logstash-6.2.3/modules/netflow/configuration"}
[2018-04-23T13:02:21,322][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-04-23T13:02:22,715][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-23T13:02:25,156][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-23T13:02:39,277][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-04-23T13:02:40,391][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-04-23T13:02:40,609][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x35f0421 run>"}
[2018-04-23T13:02:40,853][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-04-23T13:02:41,071][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}


The --config.reload.automatic option enables automatic config reloading so that you don’t have to stop and restart Logstash every time you modify the configuration file.


If our pipeline is working correctly, we should see a series of events like the following written to the console:

{
        "source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
        "offset" => 24464,
          "beat" => {
            "name" => "hdpmaster",
         "version" => "6.2.4",
        "hostname" => "hdpmaster"
    },
    "@timestamp" => 2018-04-23T13:34:17.165Z,
          "host" => "hdpmaster",
      "@version" => "1",
    "prospector" => {
        "type" => "log"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\""
}

Parsing Web Logs with the Grok Filter Plugin

Parse the log messages to create specific, named fields from the logs. The grok filter plugin is one of several plugins that are available by default in Logstash. The grok filter plugin enables you to parse the unstructured log data into something structured and queryable. 


Because the grok filter plugin looks for patterns in the incoming log data, configuring the plugin requires you to make decisions about how to identify the patterns that are of interest to your use case. A representative line from the web server log sample looks like this:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"



The IP address at the beginning of the line is easy to identify, as is the timestamp in brackets. To parse the data, you can use the %{COMBINEDAPACHELOG} grok pattern, which structures lines from the Apache log using the following schema:

Information
Field Name
IP Address
clientip
User ID
ident
User Authentication
auth
timestamp
timestamp
HTTP Verb
verb
Request body
request
HTTP Version
httpversion
HTTP Status Code
response
Bytes served
bytes
Referrer URL
referrer
User agent
agent

Edit the first-pipeline.conf file and replace the entire filter section with the following text:

first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    stdout { codec => rubydebug }
}

Save changes. Because we’ve enabled automatic config reloading, we don’t have to restart Logstash to pick up our changes. However, we do need to force Filebeat to read the log file from scratch. To do this, go to the terminal window where Filebeat is running and press Ctrl+C to shut down Filebeat. Then delete the Filebeat registry file. Since Filebeat stores the state of each file it harvests in the registry, deleting the registry file forces Filebeat to read all the files it’s harvesting from scratch. Next, restart Filebeat.


After Logstash applies the grok pattern, the events will have the following JSON representation:

{
        "message" => ".1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-3 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
    "httpversion" => "1.1",
          "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "ident" => "-",
           "beat" => {
        "hostname" => "hdpmaster",
            "name" => "hdpmaster",
         "version" => "6.2.4"
    },
         "source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
           "verb" => "GET",
      "timestamp" => "04/Jan/2015:05:30:37 +0000",
           "auth" => "-",
       "referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
       "@version" => "1",
           "host" => "hdpmaster",
         "offset" => 24672,
       "response" => "200",
     "prospector" => {
        "type" => "log"
    },
        "request" => "/MyURL-3",
          "bytes" => "4877",
       "clientip" => "1.76.62",
     "@timestamp" => 2018-04-23T14:05:20.967Z,
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}

Notice that the event includes the original message, but the log message is also broken down into specific fields.

Enhancing Data with the Geoip Filter Plugin

In addition to parsing log data for better searches, filter plugins can derive supplementary information from existing data. As an example, the geoip plugin looks up IP addresses, derives geographic location information from the addresses, and adds that location information to the logs. Configure your Logstash instance to use the geoip filter plugin. The geoip plugin configuration requires you to specify the name of the source field that contains the IP address to look up.


Since filters are evaluated in sequence, make sure that the geoip section is after the grok section of the configuration file and that both the grok and geoip sections are nested within the filter section.

When we’re done, the contents of first-pipeline.conf should look like this:

first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    stdout { codec => rubydebug }
}

Save  changes. To force Filebeat to read the log file from scratch, shut down Filebeat (press Ctrl+C), delete the registry file, and then restart Filebeat.

Notice that the event now contains geographic location information:

{
        "message" => "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-4 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
    "httpversion" => "1.1",
          "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "ident" => "-",
           "beat" => {
        "hostname" => "hdpmaster",
            "name" => "hdpmaster",
         "version" => "6.2.4"
    },
         "source" => "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
           "verb" => "GET",
      "timestamp" => "04/Jan/2015:05:30:37 +0000",
           "auth" => "-",
       "referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
       "@version" => "1",
           "host" => "hdpmaster",
         "offset" => 24885,
          "geoip" => {
           "region_name" => "Tokyo",
             "city_name" => "Tokyo",
          "country_name" => "Japan",
              "location" => {
            "lon" => 139.7514,
            "lat" => 35.685
        },
           "region_code" => "13",
           "postal_code" => "190-0031",
         "country_code2" => "JP",
             "longitude" => 139.7514,
              "timezone" => "Asia/Tokyo",
         "country_code3" => "JP",
                    "ip" => "1.76.0.62",
        "continent_code" => "AS",
              "latitude" => 35.685
    },
       "response" => "200",
     "prospector" => {
        "type" => "log"
    },
        "request" => "/MyURL-4",
          "bytes" => "4877",
       "clientip" => "1.76.62",
     "@timestamp" => 2018-04-23T14:19:33.037Z,
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}

Indexing Data into Elasticsearch

Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file and replace the entire output section.

first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

Save  changes. To force Filebeat to read the log file from scratch, shut down Filebeat (press Ctrl+C), delete the registry file, and then restart Filebeat.


Testing Pipeline

Now that the Logstash pipeline is configured to index the data into an Elasticsearch cluster, we can query Elasticsearch.

First take the list of all indices, notice the logstash-CURRENETDATE is created by logstash eg; logstash-2018.04.23

[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana             4YKQ6kwWSCyQHhM1Ks_ZIQ   1   0         23            3     90.3kb         90.3kb
yellow open   logstash-2018.04.23 MELDbsEdQvSlDL8nXj-lyQ   5   1          1            0       23kb           23kb

Try a test query to Elasticsearch based on the fields created by the grok filter plugin.

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'
curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=response=200'

[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=response=200'

{
  "took" : 48,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "logstash-2018.04.23",
        "_type" : "doc",
        "_id" : "eOfu8mIBFjVpawNmVgj1",
        "_score" : 0.2876821,
        "_source" : {
          "message" : "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-5 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "httpversion" : "1.1",
          "agent" : "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "ident" : "-",
          "beat" : {
            "hostname" : "hdpmaster",
            "name" : "hdpmaster",
            "version" : "6.2.4"
          },
          "source" : "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
          "verb" : "GET",
          "timestamp" : "04/Jan/2015:05:30:37 +0000",
          "auth" : "-",
          "referrer" : "\"http://www.semicomplete.com/projects/xdotool/\"",
          "@version" : "1",
          "host" : "hdpmaster",
          "offset" : 25098,
          "geoip" : {
            "region_name" : "Tokyo",
            "city_name" : "Tokyo",
            "country_name" : "Japan",
            "location" : {
              "lon" : 139.7514,
              "lat" : 35.685
            },
            "region_code" : "13",
            "postal_code" : "190-0031",
            "country_code2" : "JP",
            "longitude" : 139.7514,
            "timezone" : "Asia/Tokyo",
            "country_code3" : "JP",
            "ip" : "1.76.0.62",
            "continent_code" : "AS",
            "latitude" : 35.685
          },
          "response" : "200",
          "prospector" : {
            "type" : "log"
          },
          "request" : "/MyURL-5",
          "bytes" : "4877",
          "clientip" : "1.76.62",
          "@timestamp" : "2018-04-23T14:35:17.340Z",
          "tags" : [
            "beats_input_codec_plain_applied"
          ]
        }
      }
    ]
  }
}


Try another search for the geographic information derived from the IP address.

[hdpsysuser@hdpmaster ~]$ curl -XGET 'localhost:9200/logstash-2018.04.23/_search?pretty&q=geoip.city_name=Tokyo'

{
  "took" : 27,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "logstash-2018.04.23",
        "_type" : "doc",
        "_id" : "eOfu8mIBFjVpawNmVgj1",
        "_score" : 0.2876821,
        "_source" : {
          "message" : "1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /MyURL-5 HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "httpversion" : "1.1",
          "agent" : "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "ident" : "-",
          "beat" : {
            "hostname" : "hdpmaster",
            "name" : "hdpmaster",
            "version" : "6.2.4"
          },
          "source" : "/usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset",
          "verb" : "GET",
          "timestamp" : "04/Jan/2015:05:30:37 +0000",
          "auth" : "-",
          "referrer" : "\"http://www.semicomplete.com/projects/xdotool/\"",
          "@version" : "1",
          "host" : "hdpmaster",
          "offset" : 25098,
          "geoip" : {
            "region_name" : "Tokyo",
            "city_name" : "Tokyo",
            "country_name" : "Japan",
            "location" : {
              "lon" : 139.7514,
              "lat" : 35.685
            },
            "region_code" : "13",
            "postal_code" : "190-0031",
            "country_code2" : "JP",
            "longitude" : 139.7514,
            "timezone" : "Asia/Tokyo",
            "country_code3" : "JP",
            "ip" : "1.76.0.62",
            "continent_code" : "AS",
            "latitude" : 35.685
          },
          "response" : "200",
          "prospector" : {
            "type" : "log"
          },
          "request" : "/MyURL-5",
          "bytes" : "4877",
          "clientip" : "1.76.62",
          "@timestamp" : "2018-04-23T14:35:17.340Z",
          "tags" : [
            "beats_input_codec_plain_applied"
          ]
        }
      }
    ]
  }
}

If you are using Kibana to visualize your data, you can also explore the Filebeat data in Kibana:





Stashing to HDFS


Webhdfs output plugin


This plugin sends Logstash events into files in HDFS via the webhdfs REST API. The HTTP REST API supports the complete FileSystem interface for HDFS.


vi /usr/hadoopsw/elk/logstash-6.2.3/config/logstash2hdfs.conf

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
  webhdfs {
    host => "127.0.0.1"                 # (required)
    port => 50070                       # (optional, default: 50070)
    path => "/user/logstash/%{+YYYY-MM-dd}logstash-%{+HH}.log" #(required) JodaFmt 
    user => "hdpsysuser"                       # (required)
  }
}

output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

Run logstash with new file

[hdpsysuser@hdpmaster ~]$ logstash -f $LS_HOME/config/logstash2hdfs.conf --config.reload.automatic


Create the related folder in HDFS and then add one line in the Filebeat monitored file eg; /usr/hadoopsw/mydata/elkdata/logstash-tutorial-dataset
You will see the new file generated in HDFS. Check its contents. 

[hdpsysuser@hdpmaster ~]$ hdfs dfs -mkdir /user/logstash

[hdpsysuser@hdpmaster ~]$ hdfs dfs -ls /user/logstash

Found 1 items
-rwxr-xr-x   1 hdpsysuser supergroup        248 2018-04-23 17:03 /user/logstash/2018-04-23logstash-17.log

[hdpsysuser@hdpmaster ~]$ hdfs dfs -cat /user/logstash/2018-04-23logstash-17.log
2018-04-23T17:03:18.660Z hdpmaster .76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /MyURL-HDFS1 HTTP/1.1" 200 4877 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"


Managing Multi Input and Output

You can configure logstash pipeline for multiple inputs and outputs. Sample configuration is provided below. 

/data/elk/logstash-6.2.3/config/multifile.cfg 

##Input Plugin 

input {
    file {
            type => "technical"
            path => "/data/elk/elkdata/tech.log"
    }

    file {
            type => "business"
            path => "/data/elk/elkdata/buss.log"
    }

file {
            type => "application"
            path => "/data/elk/elkdata/app.log"
    }


##input ends

##Filter Plugin
filter {
    if [type] == "technical" {
            # processing .......
    }

    if [type] == "business" {
            # processing .......
    }

    if [type] == "application" {
            # processing .......
    }


}
###filter ends
###Output Plugin
output {
  if [type] == "technical" {

webhdfs {
codec=>"json"
    host => "nn01"                 # (required)
port => 50070                       # (optional, default: 50070)
path => "/logstash/techlog/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt 
user => "elk"                       # (required)
   }

  } ##technical

  if [type] == "business" {

webhdfs {
    host => "nn01"                 # (required)
port => 50070                       # (optional, default: 50070)
path => "/logstash/busslog/%{+YYYY-MM-dd}-business-%{+HH}.log" #(required) JodaFmt 
user => "elk"                       # (required)
    }
  } ##business

  if [type] == "application" {

webhdfs {
    host => "nn01"                 # (required)
port => 50070                       # (optional, default: 50070)
path => "/logstash/applog/%{+YYYY-MM-dd}-application-%{+HH}.log" #(required) JodaFmt 
user => "elk"                       # (required)
    }
  }  ##application



##output ends


Managing Multi Input and Output with beats

Here is the sample configuration to manage multiple inputs and outputs with beats.

First configure the filebeat (filebeat.yml) for harvesting  different log files on different locations then configure the logstash pipleline to process the beat.


/data/elk/filebeat-6.2.4-linux-x86_64/filebeat.yml

filebeat.prospectors:
- type: log
  paths:
    - /data/elk/elkdata/apache.log
  fields:
    logtype: apache

- type: log
  paths:
    - /data/elk/elkdata/jboss.log
  fields:
   logtype: jboss

output.logstash:
    hosts: ["dn04:5044"]


/data/elk/logstash-6.2.3/config/multi-beats-pipeline.cfg

input {
    beats {
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
if [fields][logtype] == "apache" {
webhdfs {
      host => "nn01"                 # (required)
port => 50070                       # (optional, default: 50070)
path => "/logstash/apache/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt 
user => "elk"                       # (required)
        }
       }

if [fields][logtype] == "jboss" {
webhdfs {
      host => "nn01"                 # (required)
port => 50070                       # (optional, default: 50070)
path => "/logstash/jboss/%{+YYYY-MM-dd}-technical-%{+HH}.log" #(required) JodaFmt 

 # path => "/logstash/jboss/%{+YYYY}/%{+MM}/%{+YYYYMMdd}-technical-%{+HH}.log" #(required) JodaFmt 

user => "elk"                       # (required)
        }
       }


}


Logstash Configuration for different types of Logs

##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &

############## LS Input: Configuration for OS beats ################

input {

    beats {
id => "linux"
        port => "5044"
    }##For Linux


    beats {
id => "windows"
        port => "6044"
type => "windows_events"


    }##For Winlogbeat

}##input ends

############## LS Input: Configuration for OS beats ends ################




###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################

##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }

##below field to for the Kafka as HDFS partitions are different than Kafka 
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }


ruby { 
    code => "event.set('lc_received_at', Time.now());"
}

##Parsing syslog/messages file
  if [fields][logtype] == "syslog" {
   if [source] == "/var/log/messages"
   {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
#      add_field => [ "received_at", "%{@timestamp}" ]
#      add_field => [ "received_from", "%{host}" ]

    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
   }##messages if

##Parsing syslog/audit file

   if [source] == "/var/log/audit/audit.log"
   {
##here processing for audit log
#"message" => "type=SERVICE_STOP msg=audit(1532417364.894:205361): pid=1 uid=0 auid=4294967295 ses=4294967295 msg='unit=xagt comm=\"systemd\" exe=\"/usr/lib/systemd/systemd\" hostname=? addr=? terminal=? res=success'",
kv{ 
allow_duplicate_values => "true"
recursive => "true"
}##kv
mutate {
rename => {
"type" => "audit_type"
"pid" => "process_id"
"uid" => "user_id"
"auid" => "audit_user_id"
"ses" => "session_id"
"comm" => "cmd_line_name"
"exe" => "cmd_line_path"
"addr" => "host_ip"
"terminal" => "terminal_name"
"res" => "result"
}
}##mutate

   }##audit log if



   if [source] == "/var/log/secure"
   {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }

#      add_field => [ "received_at", "%{@timestamp}" ]
#      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }

   }##secure filter if ends


  }##if logtype=syslog

############## LS Filter: Configuration for Linux Audit,Messages,Secure ends ################



############## LS Filter: Configuration for Windows Event Viewer ################
## if [@metadata][beat] == "winlogbeat"
## {
##
##       mutate { add_field => [ "received_at", "%{@timestamp}" ] }
##       mutate { add_field => [ "received_from", "%{host}" ] }
##
##
## }##winlogbeat if ends
##
############## LS Filter: Configuration for Windows Event Viewer ends ################


############## LS Filter: Configuration for IIS events Starts ################
##Parsing IIS file
  if [fields][logtype] == "iis" 
  {

##ignore log comments
if [message] =~ "^#"
{
        drop {}
}


###date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status 
###sc-substatus sc-win32-status time-taken
###2018-10-22 00:00:45 ::1 POST /powershell clientApplication=ActiveMonitor;PSVersion=4.0&CorrelationID=<empty>;&cafeReqId=e89d03ed-f465-4df6-8c16-8ec78edf5420; 80 - ::1 Microsoft+WinRM+Client - 200 0 0 15

                                                                                                        cs-username c-ip cs-version cs(User-Agent) cs(Cookie) cs(Referer) cs-host sc-status sc-substatus sc-win32-status sc-bytes cs-bytes time-taken Original-IP
grok {
            match => { "message" => [
                                        ###MAS PROD IIS Pattern
                                        #"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        #,
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        ,
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        ,
                                        ### MAS STG IIS Pattern
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int}"
                                    ] 
                     }
# add_field => [ "received_at", "%{@timestamp}" ]
}##grok ends   

        ### original user ip details    
        geoip {
                source => "originalip"
              }##geoip ends

    ###Useragent details
useragent {
source => "cs_useragent"
prefix => "cs_useragent_"
}##useragent ends
    
    ### split query string
    kv {
            source => "cs_uri_query"
            field_split => "&"
            prefix => "qs_"
    }##kv ends

    
  }##[fields][logtype] IIS if ends

############## LS Filter: Configuration for IIS events ends   ################

############## LS Filter: Configuration for Apache Tomcat Catalina Access events Starts ################
##Parsing Apache Tomcat Catalina Access Log
  if [fields][logtype] == "tomcat" 
  {


   if [source] =~ "access" {
         mutate { replace => { type => "catalina_access" } }
         grok {
           #"192.168.67.233--[12/Apr/2018:00:00:19+0300]POST/view/unit/unitRegistration.xhtmlHTTP/1.120033662"
           # 10.0.0.7 - - [03/Sep/2017:10:58:19 +0000] "GET /pki/scep/pkiclient.exe?operation=GetCACaps&message= HTTP/1.1" 200 39
#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"           
# %{NOTSPACE:csURIquery}
           match => [ "message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] %{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol} %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
                     ,"message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] \"%{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol}\" %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
                    ]
         }
   } else if [source] =~ "error" {
         mutate { replace => { type => "catalina_error" } }
   } else {
      mutate { replace => { type => "random_logs" } }
   }
   
        ### original user ip details    
        geoip {
                source => "remoteHost"
              }##geoip ends

    ###Useragent details
useragent {
source => "targetURL"
prefix => "cs_useragent_"
}##useragent ends
    
    ### split query string
    kv {
            source => "csURIquery"
            field_split => "&"
            prefix => "qs_"
    }##kv ends

    
  }##[fields][logtype] apache if ends

############## LS Filter: Configuration for Apache Tomcat Catalina Access events Ends ################




}##filter


###Sending output to HDFS
## syslog output
output {

  ##stdout { codec => rubydebug }

############## LS Output: Configuration for Linux Audit,Messages,Secure ################

  if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{


        webhdfs 
{
workers => 1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/messages/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-messages" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}
        
#       kafka 
#       {
#         
#         bootstrap_servers => "dn04:6667"
#         codec => json
#         compression_type => "snappy"
#         topic_id => "messages"
#         acks => "0"
#       }
#        
}##messages if ends


if [source] == "/var/log/audit/audit.log"
{
#stdout { codec => rubydebug } 

webhdfs 
{
workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-audit" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
                 compression => "snappy"
}
        
        kafka 
        {
          bootstrap_servers => "dn04:6667"
          codec => json
          compression_type => "snappy"
          topic_id => "audit"
          acks => "0"
        }

}##audit if ends


if [source] == "/var/log/secure"
{
#stdout { codec => rubydebug } 

webhdfs 
{
         workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}
        
#        kafka 
#        {
#          bootstrap_servers => "dn04:6667"
#          codec => json
#          compression_type => "snappy"
#          topic_id => "secure"
#          acks => "0"
#        }
#
}##secure if ends

#############APACHE ACCESS LOG #####################    
if [source] == "/var/log/apache"
{
#stdout { codec => rubydebug } 

webhdfs 
{
         workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}
        

}##apache if ends

    
  }##[fields][logtype] if ends

############## LS Output: Linux Configuration ends ################


############## LS Output: Configuration for Windows Event Viewer Starts ################

if [@metadata][beat] == "winlogbeat"
{
##stdout { codec => rubydebug }
webhdfs 
{
workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
                 #standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/win/ev/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-ev" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}##webhdfs

}##windows event viewer if ends

############## LS Output: Windows event viewer Configuration ends ################


############## LS Output: Configuration for IIS Starts ################

if [fields][logtype] == "iis"
{
##stdout { codec => rubydebug }
webhdfs 
{
workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
                 #standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/win/iis/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-iis" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}##webhdfs

}##windows event IIS if ends



############## LS Output: Configuration for IIS Ends ################

############## LS Output: Configuration for Apache Tomcat Catalina Starts ################
if [fields][logtype] == "tomcat"
{
##stdout { codec => rubydebug }
webhdfs 
{
workers =>1
host => "dn04"                 # (required)   
port => 50070                       # (optional, default: 50070)
                 #standby_host => "pn3-hdp-rv-nn02"                 # (required)   
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/linux/tomcat/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-accesslog" #(required) JodaFmt 
user => "hdfs"                       # (required)
codec=>"json"
}##webhdfs

}##Tomcat Catalina if ends

############## LS Output: Configuration for Apache Tomcat Catalina Ends ################
   

}##output ends



Multi Pipelines

If you need to run more than one pipeline in the same process, Logstash provides a way to do this through a configuration file called pipelines.yml. This file must be placed in the path.settingsfolder



pipelines.yml

##LINUX pipeline for Audit,Secure,Messages.Tomcat Catalina,
- pipeline.id: linux-pipeline
path.config: "/usr/hadoopsw/elk/logstash-6.3.1/config/linux-pipeline.cfg"

## WINDOWS pipeline for Windows Event Viewer and IIS
- pipeline.id: windows-pipeline
path.config: "/usr/hadoopsw/elk/logstash-6.3.1/config/windows-pipeline.cfg"


This file is formatted in YAML and contains a list of dictionaries, where each dictionary describes a pipeline, and each key/value pair specifies a setting for that pipeline.


linux-pipeline.cfg


##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &
############## LS Input: Configuration for OS beats ################
input {
beats {
id => "linux"
port => "5044"
}##For Linux
}##input ends

############## LS Input: Configuration for OS beats ends ################
###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################
##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }
##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }
ruby {
code => "event.set('lc_received_at', Time.now());"
}
##Parsing syslog/messages file
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}##messages if


##Parsing syslog/audit file
if [source] == "/var/log/audit/audit.log"
{
##here processing for audit log
#"message" => "type=SERVICE_STOP msg=audit(1532417364.894:205361): pid=1 uid=0 auid=4294967295 ses=4294967295 msg='unit=xagt comm=\"systemd\" exe=\"/usr/lib/systemd/systemd\" hostname=? addr=? terminal=? res=success'",
kv{
allow_duplicate_values => "true"
recursive => "true"
}##kv
mutate {
rename => {
"type" => "audit_type"
"pid" => "process_id"
"uid" => "user_id"
"auid" => "audit_user_id"
"ses" => "session_id"
"comm" => "cmd_line_name"
"exe" => "cmd_line_path"
"addr" => "host_ip"
"terminal" => "terminal_name"
"res" => "result"
}
}##mutate
}##audit log if

if [source] == "/var/log/secure"
{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }

# add_field => [ "received_at", "%{@timestamp}" ]
# add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}

}##secure filter if ends
}##if logtype=syslog

############## LS Filter: Configuration for Linux Audit,Messages,Secure ends ################

############## LS Filter: Configuration for Apache Tomcat Catalina Access events Starts ################
##Parsing Apache Tomcat Catalina Access Log
if [fields][logtype] == "tomcat"
{
if [source] =~ "access" {
mutate { replace => { type => "catalina_access" } }
grok {
#"192.168.67.233--[12/Apr/2018:00:00:19+0300]POST/view/unit/unitRegistration.xhtmlHTTP/1.120033662"
# 10.0.0.7 - - [03/Sep/2017:10:58:19 +0000] "GET /pki/scep/pkiclient.exe?operation=GetCACaps&message= HTTP/1.1" 200 39

#"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"

# %{NOTSPACE:csURIquery}

match => [ "message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] %{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol} %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"

,"message" ,"%{IPORHOST:remoteHost} %{NOTSPACE:remoteLogicalUserName} %{NOTSPACE:remoteAuthUserName} \[%{DATA:dateTime}\] \"%{WORD:reqMethod} %{URIPATH:targetURLstem} ?%{NOTSPACE:queryString} %{DATA:targetProtocol}\" %{NUMBER:httpStatusCode} %{NUMBER:bytesSent}"
]
}
} else if [source] =~ "error" {
mutate { replace => { type => "catalina_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}

### original user ip details
geoip {
source => "remoteHost"
}##geoip ends

###Useragent details
useragent {
source => "targetURL"
prefix => "cs_useragent_"
}##useragent ends
### split query string
kv {
source => "csURIquery"
field_split => "&"
prefix => "qs_"
}##kv ends

}##[fields][logtype] apache if ends

############## LS Filter: Configuration for Apache Tomcat Catalina Access events Ends ################

}##filter

###Sending output to HDFS
## syslog output
output {
##stdout { codec => rubydebug }

############## LS Output: Configuration for Linux Audit,Messages,Secure ################
if [fields][logtype] == "syslog" {
if [source] == "/var/log/messages"
{
webhdfs
{
workers => 1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/messages/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-messages" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}

}##messages if ends

if [source] == "/var/log/audit/audit.log"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-audit" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
#compression => "snappy"
}
}##audit if ends

if [source] == "/var/log/secure"
{
#stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/var/log/secure/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-secure" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}
}##secure if ends


}##[fields][logtype] == "syslog" if ends

############## LS Output: Linux Syslog Configuration ends ################
############## LS Output: Configuration for Apache Tomcat Catalina Starts ################
if [fields][logtype] == "tomcat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04" # (required)
port => 50070 # (optional, default: 50070)
#standby_host => "pn3-hdp-rv-nn02" # (required)
#standby_port => 50070 # (optional, default: 50070)
path => "/lme/logs/os/linux/tomcat/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-accesslog" #(required) JodaFmt
user => "hdfs" # (required)
codec=>"json"
}##webhdfs

}##Tomcat Catalina if ends

############## LS Output: Configuration for Apache Tomcat Catalina Ends ################
}##output ends



windows-pipeline.cfg

##verify config: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.test_and_exit
##Start logstash: logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic
## sudo /opt/filebeat-6.3.1-linux-x86_64/filebeat -e -c /opt/filebeat-6.3.1-linux-x86_64/master-filebeat-log-shipper.yml -d "publish"
## nohup logstash -f $LS_HOME/config/servers-master-pipeline.cfg --config.reload.automatic > "/opt/logstash-6.3.1/logs/start_logstash_nohup_`date +%Y%m%d_%H%M`.log" &

############## LS Input: Configuration for OS beats ################
input {
    beats {
id => "windows"
        port => "6044"
type => "windows_events"
    }##For Winlogbeat
}##input ends

############## LS Input: Configuration for OS beats ends ################

###Filter to process/parse log
filter {
############## LS Filter: Configuration for Linux Audit,Messages,Secure ################


##add a common column to all type of logs received so that we can identify in case we have more than one log collector
mutate { add_field => [ "lc_host", "dn01" ] }
mutate { add_field => [ "lc_suffix", "dn01" ] }

##below field to for the Kafka as HDFS partitions are different than Kafka
mutate { add_field => [ "year", "%{+YYYY}" ] }
mutate { add_field => [ "month", "%{+MM}" ] }
mutate { add_field => [ "day", "%{+dd}" ] }
mutate { add_field => [ "hour", "%{+HH}" ] }


ruby {
    code => "event.set('lc_received_at', Time.now());"
}


############## LS Filter: Configuration for Windows Event Viewer ################
## if [@metadata][beat] == "winlogbeat"
## {
##
##       mutate { add_field => [ "received_at", "%{@timestamp}" ] }
##       mutate { add_field => [ "received_from", "%{host}" ] }
##
##
## }##winlogbeat if ends
##
############## LS Filter: Configuration for Windows Event Viewer ends ################


############## LS Filter: Configuration for IIS events Starts ################
##Parsing IIS file
  if [fields][logtype] == "iis"
  {
##ignore log comments
if [message] =~ "^#"
{
        drop {}
}


###date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status
###sc-substatus sc-win32-status time-taken
###2018-10-22 00:00:45 ::1 POST /powershell clientApplication=ActiveMonitor;PSVersion=4.0&CorrelationID=<empty>;&cafeReqId=e89d03ed-f465-4df6-8c16-8ec78edf5420; 80 - ::1 Microsoft+WinRM+Client - 200 0 0 15

### date time s-sitename s-computername s-ip cs-method cs-uri-stem cs-uri-query s-port                                                                                                                              cs-username c-ip cs-version cs(User-Agent) cs(Cookie) cs(Referer) cs-host sc-status sc-substatus sc-win32-status sc-bytes cs-bytes time-taken Original-IP
grok {
            match => { "message" => [
                                        ###MAS PROD IIS Pattern
                                        #"%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        #,
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_sitename} %{NOTSPACE:s_computername} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_version} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_cookie} %{NOTSPACE:cs_referer} %{IPORHOST:cs_host} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:sc_bytes} %{NUMBER:cs_bytes} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        ,
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int} %{IPORHOST:originalip}"
                                        ,
                                        ### MAS STG IIS Pattern
                                        "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:s_ip} %{WORD:cs_method} %{URIPATH:cs_uri_stem} %{NOTSPACE:cs_uri_query} %{NUMBER:s_port} %{NOTSPACE:cs_username} %{IPORHOST:c_ip} %{NOTSPACE:cs_useragent} %{NOTSPACE:cs_referer} %{NUMBER:sc_status} %{NUMBER:sc_substatus} %{NUMBER:sc_win32_status} %{NUMBER:timetaken:int}"
                                    ]
                     }
# add_field => [ "received_at", "%{@timestamp}" ]
}##grok ends 

        ### original user ip details 
        geoip {
                source => "originalip"
              }##geoip ends

    ###Useragent details
useragent {
source => "cs_useragent"
prefix => "cs_useragent_"
}##useragent ends
 
    ### split query string
    kv {
            source => "cs_uri_query"
            field_split => "&"
            prefix => "qs_"
    }##kv ends
  }##[fields][logtype] IIS if ends
############## LS Filter: Configuration for IIS events ends   ################
}##filter


###Sending output to HDFS
## syslog output
output {

  ##stdout { codec => rubydebug }

############## LS Output: Configuration for Windows Event Viewer Starts ################

if [@metadata][beat] == "winlogbeat"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04"                 # (required)
port => 50070                       # (optional, default: 50070)
                 #standby_host => "pn3-hdp-rv-nn02"                 # (required)
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/win/ev/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-ev" #(required) JodaFmt
user => "hdfs"                       # (required)
codec=>"json"
}##webhdfs
}##windows event viewer if ends
############## LS Output: Windows event viewer Configuration ends ################

############## LS Output: Configuration for IIS Starts ################

if [fields][logtype] == "iis"
{
##stdout { codec => rubydebug }
webhdfs
{
workers =>1
host => "dn04"                 # (required)
port => 50070                       # (optional, default: 50070)
                 #standby_host => "pn3-hdp-rv-nn02"                 # (required)
#standby_port => 50070                       # (optional, default: 50070)
path => "/lme/logs/os/win/iis/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}/%{+YYYYMMdd}-%{+HH}-%{lc_suffix}-iis" #(required) JodaFmt
user => "hdfs"                       # (required)
codec=>"json"
}##webhdfs
}##windows event IIS if ends

############## LS Output: Configuration for IIS Ends ################
}##output ends

Output for Kafka

output {
      kafka {
bootstrap_servers => "x.x.44.135:9092"
        codec => json
        topic_id => "topic_all_logs"
      }
}


Run the pipelines

[hdpsysuser@dn01 ~]$ logstash --config.reload.automatic


1 comment:

Raj Maan said...

What a Beautiful post! This is so chock full of useful information I can't wait to dig and start using my time on blogging. I am just over six month into blogging 123movies