Files
oam/knowledge base/logstash.md
2025-05-08 23:22:18 +02:00

9.2 KiB

Logstash

Server-side data processing pipeline that ingests data, transforms it, and then sends the results to any collector.

Part of the Elastic Stack along with Beats, ElasticSearch and Kibana.

  1. TL;DR
  2. Create plugins
  3. Troubleshooting
    1. Check a pipeline is processing data
    2. Log pipeline data to stdout
  4. Further readings
    1. Sources

TL;DR

Setup
dnf install 'logstash'
docker pull 'logstash:7.17.27'
yum install 'logstash'
Usage
# Start the service.
docker run --rm --detach --name 'logstash' --publish '5044:5044' 'logstash:7.17.27'

# Get a shell in the docker image.
docker run --rm -ti --name 'logstash' --entrypoint 'bash' 'logstash:7.17.27'

# Validate configuration files.
logstash -tf 'config.conf'
logstash --config.test_and_exit --path.config 'config.conf' --api.enabled='false'
docker run --rm --name 'logstash' -v "$PWD:/usr/share/logstash/custom-config" 'logstash:7.17.7' \
  --api.enabled='false' --config.test_and_exit --path.config 'staging.conf'
# Should `path.config` be a directory, loads and checks *all* files in it as if they were a *single* pipeline.
logstash --config.test_and_exit --path.config 'configDir' --log.level='debug'
docker run --rm -ti -v "$PWD:/usr/share/logstash/custom-dir" 'docker.io/library/logstash:7.17.27' -tf 'custom-dir'

# Automatically reload configuration files on change.
# Default interval is '3s'.
logstash … --config.reload.automatic
logstash … --config.reload.automatic --config.reload.interval '5s'

# Force configuration files reload and restart the pipelines.
# Does not really seem to work, honestly. Just restart the whole service.
kill -SIGHUP '14175'
pkill -HUP 'logstash'


# Install plugins.
logstash-plugin install 'logstash-output-loki'

# List installed plugins.
logstash-plugin list
logstash-plugin list --verbose
logstash-plugin list '*namefragment*'
logstash-plugin list --group 'output'


# Get Logstash's status.
curl -fsS 'localhost:9600/_health_report?pretty'

# Get pipelines' statistics.
curl -fsS 'localhost:9600/_node/stats/pipelines?pretty'
curl -fsS 'localhost:9600/_node/stats/pipelines/somePipeline?pretty'
input {
  file {
    path => "/var/log/logstash/logstash-plain.log"
  }
  syslog {
    port => 9292
    codec => "json"
  }
  tcp {
    port => 9191
    codec => "json"
  }
}

filter {
  grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\] .+" }
  }
  json {
    skip_on_invalid_json => true
    source => "message"
    add_tag => ["json_body"]
  }
  mutate {
    add_field => {
      "cluster" => "eu-west-1"
      "job" => "logstash"
    }
    replace => { "type" => "stream"}
    remove_field => [ "src" ]
  }

  if [loglevel] != "ERROR" and [loglevel] != "WARN" {
    drop { }
  }
}

output {
  loki {
    url => "http://loki.example.org:3100/loki/api/v1/push"
  }
  opensearch {
    hosts => [ "https://os.example.org:443" ]
    auth_type => {
      type => 'aws_iam'
      region => 'eu-west-1'
    }
    index => "something-%{+YYYY.MM.dd}"
    action => "create"
  }
  stdout { codec => rubydebug }
  file {
    path => "/tmp/debug.json"
  }
}

Create plugins

Refer How to write a Logstash input plugin for input plugins.
Refer How to write a Logstash codec plugin for codec plugins.
Refer How to write a Logstash filter plugin for filter plugins.
Refer How to write a Logstash output plugin for output plugins.

Whatever the type of plugin, it will need to be a self-contained Ruby gem.

logstash-plugin generate creates a foundation for new Logstash plugins with files from templates.
It creates the standard directory structure, gemspec files, and dependencies a new plugin needs to get started.

The directory structure should look something like the following.
Replace filter/filters with codec/codecs, input/inputs, or output/outputs accordingly.

$ logstash-plugin generate --type 'filter' --name 'test'
[]

$ tree 'logstash-filter-test'
logstash-filter-test
├── CHANGELOG.md
├── CONTRIBUTORS
├── DEVELOPER.md
├── docs
│   └── index.asciidoc
├── Gemfile
├── lib
│   └── logstash
│       └── filters
│           └── test.rb
├── LICENSE
├── logstash-filter-test.gemspec
├── Rakefile
├── README.md
└── spec
    ├── filters
    │   └── test_spec.rb
    └── spec_helper.rb

Plugins:

  • Require parent classes defined in logstash/filters/base (or the appropriate plugin type's) and logstash/namespace.

    require "logstash/filters/base"
    require "logstash/namespace"
    
  • Shall be subclass of LogStash::Filters::Base (or the appropriate plugin type's).
    The class name shall closely mirror the plugin name.

    class LogStash::Filters::Test < LogStash::Filters::Base
    
  • Shall set their config_name to their own name inside the configuration block.

    class LogStash::Filters::Test < LogStash::Filters::Base
      config_name "test"
    
  • Include a configuration section defining as many parameters as needed to enable Logstash to process events.

    class LogStash::Filters::Test < LogStash::Filters::Base
      config_name "test"
      config :message, :validate => :string, :default => "Hello World!"
    
  • Must implement the register method, plus one or more other methods specific to the plugin's type.

Once ready:

  1. Fix the gemspec file.
  2. Build the Ruby gem.
gem build
  1. Install the plugin in Logstash.
$ logstash-plugin install 'logstash-filter-test-0.1.0.gem'
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
io/console on JRuby shells out to stty for most operations
Validating logstash-filter-test-0.1.0.gem
Installing logstash-filter-test

Troubleshooting

Check a pipeline is processing data

Steps in order of likeliness
  1. Check the Logstash process is running correctly

    systemctl status 'logstash.service'
    journalctl -xefu 'logstash.service'
    
    docker ps
    docker logs 'logstash'
    
  2. Check the Logstash process is getting and/or sending data:

    tcpdump 'dst port 8765 or dst opensearch.example.org'
    
  3. Check the pipeline's statistics are changing:

    curl -fsS 'localhost:9600/_node/stats/pipelines/somePipeline' \
    | jq '.pipelines."somePipeline"|{"events":.events,"queue":.queue}' -
    
    {
      "events": {
        "in": 20169,
        "out": 20169,
        "queue_push_duration_in_millis": 11,
        "duration_in_millis": 257276,
        "filtered": 20169
      },
      "queue": {
        "type": "memory",
        "events_count": 0,
        "queue_size_in_bytes": 0,
        "max_queue_size_in_bytes": 0
      }
    }
    
  4. Check the pipeline's input and output plugin's statistics are changing:

    curl -fsS 'localhost:9600/_node/stats/pipelines/somePipeline' \
    | jq '.pipelines."somePipeline".plugins|{"in":.inputs,"out":.outputs[]|select(.name=="opensearch")}' -
    
  5. Log the pipeline's data to stdout to check data is parsed correctly.

Log pipeline data to stdout

Leverage the stdout output plugin in any pipeline's configuration file:

output {
  stdout {
    codec => rubydebug {
      metadata => true   # also print metadata in console
    }
  }
}

Further readings

Sources