Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Thursday, July 07, 2022

Using Filebeat/Logstash to send logs to Minio Data Lake

To aggregate logs directly to an object store like Minio, you can use the Logstash S3 output plugin. Logstash aggregates and periodically writes objects on S3, which are then available for later analysis. For more information please review the related post at the end of this post.


Logstash Configuration

An example Logstash config highlights the parts necessary to connect to Minio and send logs to the bucket “logstash-output” which should already exist.



[hdpsysuser@hdpmaster config]$ vim minio-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
 
s3{
access_key_id => "minioadmin"
secret_access_key => "minioadmin"
region => "myregion"
endpoint => "http://hdpmaster:9000"
bucket => "logstash-output"
size_file => 20480   ##bytes
time_file => 5        ##minutes
#codec => "line"
codec => "json_lines"
prefix => "rawlogs/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}"   ## simulate directories on S3
#restore => true     ## enable recovery after crash/abnormal termination
#temporary_directory => "/tmp/LogStash/raw" ## if you provide restore
#upload_workers_count => 2   ## how many workers to use to upload the files to S3
#server_side_encryption => true
     #server_side_encryption_algorithm => "AES256"
#validate_credentials_on_root_bucket => false
#canned_acl => "bucket-owner-full-control"
     additional_settings => { "force_path_style" => "true" }
}
    stdout { codec => rubydebug }

}

Note that the force_path_style setting is required; configuring a Minio endpoint needs path style addressing instead of virtual host addressing. Path-style addressing does not require co-configuration with DNS servers and therefore is simpler in on-premises environments.

Logstash can trade off efficiency of writing to S3 with the possibility of data loss through the two configuration options “time_file” and “size_file,” which control the frequency of flushing lines to an object. Larger flushes result in more efficient writes and object sizes, but result in a larger window of possible data loss if a node fails. The maximum amount of data loss is the smaller of “size_file” and “time_file” worth of data.

-- Test the configuration
[hdpsysuser@hdpmaster config]$ logstash -f $LS_HOME/config/minio-pipeline.conf --config.test_and_exit
-- Run logstash
[hdpsysuser@hdpmaster config]$ logstash -f $LS_HOME/config/minio-pipeline.conf --config.reload.automatic



Filebeat Configuration

Now as our Logstash is running , we can configure filebeat to send logs to Logstash.

-- Related Test filebeat configuration
vim /etc/filebeat/filebeat.yml
# ============================== Filebeat inputs =========================

filebeat.inputs:

# filestream is an input for collecting log messages from files.
- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/messages

# ================= Outputs ==============================

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
output.logstash:
    hosts: ["hdpmaster:5044"]
    

-- Test filebeat configuration

[hdpsysuser@hdpmaster ~]$ sudo filebeat test config -c /etc/filebeat/filebeat.yml
-- Test filebeat output
[hdpsysuser@hdpmaster ~]$ sudo filebeat test output
logstash: hdpmaster:5044...
  connection...
    parse host... OK
    dns lookup... OK
    addresses: 192.168.56.104
    dial up... OK
  TLS... WARN secure connection disabled
  talk to server... OK

Test the Flow

-- Start filebeat
[hdpsysuser@hdpmaster ~]$ sudo systemctl start filebeat.service

Check Logstash and Minio console to verify the messages.
 Wait for the linux system activity which is tracked in messages logs (under monitoring by filebeat in our configuration). 



-- verify using command line


-- Using Minio console


Select any file and preview it to see the contents, it is as per your codec setting in Logstash output plugin.




Optional: Analyze the data in Data Lake using Presto

-- Create external table using the location where logstash is sending data in minio bucket.

CREATE EXTERNAL TABLE messages_log(
  actual_message string
)
STORED AS TEXTFILE
LOCATION  's3a://logstash-output/';

--partitioned Table
DROP TABLE log4dremio;
CREATE EXTERNAL TABLE log4dremio(
  actual_message string
)
PARTITIONED BY(year int,month int,day int,hour int)
STORED AS TEXTFILE
LOCATION  's3a://logstash-output/rawlogs';

0: jdbc:hive2://> ALTER TABLE log4dremio ADD PARTITION (year=2022,month=08,day=11,hour=07) LOCATION 'year=2022/month=08/day=11/hour=06';

0: jdbc:hive2://> SHOW PARTITIONS log4dremio;
0: jdbc:hive2://> select * from log4dremio limit 1;







-- Access the table from Presto and hive to verify the contents
[trino391@hdpmaster ~]$  trinocli --server http://hdpmaster:6060 --catalog minio

presto:myschema> select * from messages_log;






Note:
If you have .gz file , you could create table on the location also.


--- External table on gz FILE in Hive
create external table mytable
(
    col   string
   
)
stored as textfile
LOCATION  's3a://logstash-output/logs';
;

--query from Hive 
0: jdbc:hive2://> select * from mytable limit1;


-- query from Presto
select * from minio.default.mytable limit 1;

-- create external table in Presto
create table minio.default.mytable2 (col varchar(65535)) 
with (format='TEXTFILE', external_location='s3a://logstash-output/logs');



-- select from presto
select * from minio.default.mytable2 limit 1;



-- create view (Parsed)
Create view pvw_myt2 As
SELECT 
json_extract(regexp_extract(col, '{.*}'),'$.cpu_p') as cpu_p
,json_extract(regexp_extract(col, '{.*}'),'$.user_p') as user_p
,regexp_extract(col, '{.*}') as stmt
from minio.default.mytable2 ;





Related Posts:

No comments: