Logstash
If you are using Logstash as a log collector and/or parser, you can configure it to push your logs to Sekoia.io. This operation is done by using the “[Http output plugin][logstash-http-output-plugin]”, which is bundled into the default version of Logstash.
To push logs, you have to configure some filters in Logstash that will add the proper intake key
considering your logs.
Example
In the following example, we have multiple inputs to handle logs collected via Syslog (Apache HTTP Server and NGINX logs) and via Beats (Winlogbeat) and forward them to Sekoia.io.
In order to filter events effectively, Logstash uses tags as a key component. To ensure proper functionality, make sure to update the intake key value by editing the placeholder CHANGE_ME_INTAKE_KEY
mentioned below. Additionally, you have the flexibility to incorporate multiple filters within the filter
section as per your requirements.
Tip
By adding additional filters, you can enhance the filtering capabilities of Logstash and customize the processing of events to suit your requirements.
Note
Beats agents require a specific output configuration as you need to forward the complete JSON event to Sekoia.io.
input {
beats {
port => 5044
}
tcp {
port => 514
}
}
filter {
# Edit this filter to adapt to your own needs.
if "apache2" in [tags] {
mutate {
add_field => { "[@metadata][intake_key]" => "CHANGE_ME_INTAKE_KEY" }
}
}
else if "nginx" in [tags] {
mutate {
add_field => { "[@metadata][intake_key]" => "CHANGE_ME_INTAKE_KEY" }
}
}
}
output {
if "" in [@metadata][intake_key] {
http {
format => "json"
http_method => "post"
url => "https://intake.sekoia.io"
mapping => {
"@timestamp" => "%{@timestamp}"
"json" => "%{message}"
"intake_key" => "%{[@metadata][intake_key]}"
}
}
}
else if "winlogbeat" in [agent][type] {
http {
format => "json"
http_method => "post"
url => "https://intake.sekoia.io/plain"
codec => "json_lines"
headers => {
"X-SEKOIAIO-INTAKE-KEY" => "CHANGE_ME_INTAKE_KEY"
"X-SEKOIAIO-EVENT-TIMESTAMP" => "%{@timestamp}"
}
}
}
}
Advanced configuration
Warning
This advanced configuration is provided as-is by Sekoia.io for experienced Logstash administrators. Sekoia.io will provide best effort support on this configuration.
The above configuration will send your logs one at a time (one HTTP request per log), this configuration will work for single pipeline / low log throughput.
For more advanced use cases, where you want to send logs to Sekoia.io and to an Elasticsearch instance for example, a more advanced Logstash configuration is recommended to achieve higher throughput. This configuration uses multiple pipelines and pipeline-to-pipeline communications to duplicate events and format them to the expected payload format required by Sekoia.io. Events will be sent in batch mode, providing better performance.
Note
Beats events do not need to be duplicated into a second pipeline as the complete JSON event is sent to Sekoia.io.
pipelines.yml
- pipeline.id: my-pipeline_1
path.config: "/etc/path/to/p1.cfg"
- pipeline.id: my-other-pipeline
path.config: "/etc/different/path/p2.cfg"
- pipeline.id: sekoiaio-apache2
path.config: "/etc/path/to/sekoiaio-apache2.cfg"
- pipeline.id: sekoiaio-nginx
path.config: "/etc/path/to/sekoiaio-nginx.cfg"
p1.cfg
input {
beats {
port => 5044
}
}
output {
elasticsearch {
#Your elasticsearch configuration
}
if "winlogbeat" in [agent][type] {
http {
format => "json_batch"
http_method => "post"
url => "https://intake.sekoia.io/jsons"
codec => "json_lines"
headers => {
"X-SEKOIAIO-INTAKE-KEY" => "CHANGE_ME_INTAKE_KEY"
}
}
}
}
p2.cfg
input {
tcp {
port => 514
}
}
output {
if "apache2" in [tags] {
elasticsearch {
#Your elasticsearch configuration
}
pipeline {
send_to => [sekoiaio-apache2]
}
}
else if "nginx" in [tags] {
elasticsearch {
#Your elasticsearch configuration
}
pipeline {
send_to => [sekoiaio-nginx]
}
}
}
sekoiaio-apache2.cfg
input {
pipeline {
address => sekoia-apache2
}
}
filter {
# When sending events in batch mode, we cannot choose in the output configuration which fields to send. We need to keep only the message field.
prune {
whitelist_names => ["^message$"]
}
# Rename the message field to json (expected field name by the HTTP endpoint) and add the intake key
mutate {
rename => { "message" => "json" }
add_field => { "intake_key" => "CHANGE_ME_INTAKE_KEY_APACHE2" }
}
}
output {
http {
format => "json_batch"
http_method => "post"
url => "https://intake.sekoia.io/array"
codec => "json_lines"
}
}
sekoiaio-nginx.cfg
input {
pipeline {
address => sekoia-nginx
}
}
filter {
# When sending events in batch mode, we cannot choose in the output configuration which fields to send. We need to keep only the message field.
prune {
whitelist_names => ["^message$"]
}
# Rename the message field to json (expected field name by the HTTP endpoint) and add the intake key
mutate {
rename => { "message" => "json" }
add_field => { "intake_key" => "CHANGE_ME_INTAKE_KEY_NGINX" }
}
}
output {
http {
format => "json_batch"
http_method => "post"
url => "https://intake.sekoia.io/array"
codec => "json_lines"
}
}