Интеграция пользовательского процессора журналов — Fluentd против Vector.dev

Интеграция пользовательского процессора журналов — Fluentd против Vector.dev

10 февраля 2023 г.

Если у вас есть приложение, вы создаете журналы. И если у вас есть несколько приложений, журналы накапливаются. Объем и сложность этих журналов иногда могут быть настолько огромными, что даже стандартные функции популярных конвейеров журналирования, таких как Fluentd или Vector, не работают. Вот почему вы решили создать свой инструмент, двоичный файл, который вы планировали поместить в середину вашей цепочки обработки. На первый взгляд, это может показаться сомнительным выбором, поскольку обычно лучше всего использовать доменно-ориентированные языки (DSL), когда это возможно. Но, написав свой инструмент, вы получите больше возможностей настройки, универсальности и других расширенных возможностей, которые могут обогатить вашу обработку.

Флуэт

На одном из моих предыдущих рабочих мест у нас был конвейер Fluent с исполняемым файлом в рамках стандартных правил. Конвейер запускается, как правило, путем захвата журналов из Filebeat и их анализа в формате JSON.

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<source>
  @type tail
  path /app/filebeat/logs/json
  pos_file /app/filebeat/logs/json.pos
  read_lines_limit 100
  tag filebeat.json
  <parse>
    @type json
  </parse>
</source>

Итак, вот в чем дело. Мы добавляем наш двоичный файл logprocessor для обработки каждой строки ввода из файлов, заканчивающихся расширением .log. Таким образом, директива match генерирует выходные данные, которые затем можно передать на последующие этапы конвейера журналов.

<match **.log>
  @type exec_filter
  child_respawn -1
  command sh -c "logprocessor --json"
  <format>
    @type json
  </format>
  <parse>
    @type json
  </parse>
</match>

Давайте прервемся на мгновение. Зачем нам может понадобиться собственный обработчик журналов? Ну, например, этот инструмент может читать текстовые журналы с использованием регулярных выражений и комбинировать неочевидные многострочные сообщения. Он также может проверять формат даты/времени и уровни журнала журналов, созданных различными приложениями, написанными на разных языках, с их разнообразными стандартными библиотечными регистраторами, что в конечном итоге выводит журналы в унифицированном формате.

Вектор

Пришло время присоединиться к тренду и отправиться туда, где крутятся все крутые ребята. Используйте движок Vector (написанный на Rust), чтобы повысить производительность и стать настоящим хипстером. Вы можете подумать, что единственное, что вам нужно здесь, это заменить XML на TOML, но я так не думаю. И проблема в том, что в Vector нет опции command для направления журналов в stdin вашего исполняемого файла. Он имеет только исходные коды exec, которые работают немного по-другому.

Давайте посмотрим пример. Вы определили некоторые источники и получили журналы от Docker:

[sources.docker_json_logs]
  type = "docker_logs"
  docker_host = "unix:///var/run/docker.sock"
  include_labels = ["format=json"]

И у вас есть источник exec:

[sources.format_json_logs]
  type = "exec"
  command = ["sh", "-c", "logprocessor --json"]

Вы заметили что-то странное? Оба фрагмента кода говорят об источнике. И этот пример точно не сработает. Нам нужно каким-то образом передать журналы в stdin нашего исполняемого файла.

Подходящим решением было бы сохранить первый исходник в файл и использовать его следующим образом:

[sinks.write_json_logs]
  type = "file"
  inputs = ["docker_json_logs"]
  path = "/app/file"
  encoding.codec = "ndjson"

[sources.format_json_logs]
  type = "exec"
  command = ["sh", "-c", "tail -f /app/file | logprocessor --json"]
  mode = "streaming"
  working_directory = "/logs/json"
  streaming.respawn_on_exit = true

Мы используем источник exec с режимом streaming для непрерывного извлечения журналов и перенаправления их в нужное место, logprocessor стандартный ввод. И, конечно же, передавать логи дальше по нашему пайплайну.

Точная проблема в том, что у нас есть файл, а это значит, что мы должны вращать и обрабатывать его всеми возможными способами, что неверно. Но нам на помощь придет вечная классика, специальный файл Linux, известный как канал и называемый mkfifo.

Давайте прочитаем точное описание из справочных страниц Linux:

<цитата>

После того как вы таким образом создали специальный файл FIFO, любой процесс может открыть его для чтения или записи точно так же, как и обычный файл. Однако он должен быть открыт с обоих концов одновременно, прежде чем вы сможете выполнять какие-либо операции ввода или вывода на нем. Открытие FIFO для чтения обычно блокируется до тех пор, пока какой-либо другой процесс не откроет тот же FIFO для записи, и наоборот.

Проще говоря, вы можете писать и читать из файла, не используя дискового пространства. Он работает как конвейер, но с некоторыми ограничениями по объему (обычно ограничивается буфером в 1 МБ). Однако использование команды tail с каналом, скорее всего, никогда не достигнет этих пределов.

Следите за буквой p в выводе ls -l:

mkfifo pipe
ls -l pipe
prw-r--r-- 0 hackernooner hackernoon 4 Feb 00:36 -- pipe

Давайте сделаем это немного умнее и сделаем оболочку bash, добавив перенаправления stderr на случай, если что-то пойдет не так с бинарником (мы также можем добавить файл logprocessor_error.log в часть файла конвейер журнала):

#!/usr/bin/env bash
tail -f -n +1 pipe | RUST_LOG=debug logprocessor $@ 2>logprocessor_error.log

И добавьте его в раздел настроек вектора:

[sinks.write_json_logs]
  type = "file"
  inputs = ["docker_json_logs"]
  path = "/app/pipe"
  encoding.codec = "ndjson"

[sources.format_json_logs]
  type = "exec"
  command = ["sh", "-c", "logprocessor.sh --json"]
  mode = "streaming"
  working_directory = "/app"
  streaming.respawn_on_exit = true

И вот оно! Обладая этими знаниями, вы теперь готовы обрабатывать каждую входящую строку журнала с помощью специального исполняемого процессора журнала для инструментов конвейера Fluentd и Vector. Сила в ваших руках!


Оригинал