Создайте более умные рабочие процессы данных с новой функцией прослушивателя событий Seatunnel

Создайте более умные рабочие процессы данных с новой функцией прослушивателя событий Seatunnel

31 июля 2025 г.

В сегодняшних все более сложных задачах интеграции данных способность отслеживать ключевые события во время выполнения работы в режиме реального времени - и запускать последующую логику на основе этих событий - стала неотъемлемой частью создания современных платформ данных. Начиная с версии 2.3.0, Apache Seatunnel представила совершенно новый механизм слушателей событий, предлагая гибкую систему крючков, которая позволяет пользователям создавать более разумные и более управляемые бизнесом рабочие процессы интеграции данных.

Эта статья будет углубляться в мощные возможности этого механизма, исследуя его общую архитектуру, основные концепции, методы использования и лучшие практики.

Что такое механизм слушателя событий?

Механизм слушателя событий в Seatunnel - это подключаемая система крючков, которая позволяет пользователям захватывать и реагировать на критические события во время выполнения работы, например:

  • До начала работы (JobStartEvent)
  • После успешного завершения работы или терпится терпимость (JobSuccessEvent / JobFailEvent)
  • События выполнения на уровне задачи (TaskStartEvent / TaskSuccessEventTaskFailEvent)

Каждый тип события может быть связан с соответствующими слушателями, которые выполняют различные действия, такие как отправка оповещений, ведение журналов данных аудита или вывод внешних систем. Это значительно повышает наблюдение и расширяемость Seatunnel.

Основные компоненты

Механизм слушателя событий в Seatunnel состоит из трех основных компонентов:

  1. **Событие \ Представляет определенное поведение или изменение состояния в системе. Каждое событие несет контекстную информацию, такую как идентификатор задания, имя задачи, статус и т. Д.
  2. ** Слушатель событий \ Подключаемый компонент, который получает события и выполняет соответствующую бизнес -логику.
  3. ** Диспетчер событий \ Автобус событий, который отправляет события зарегистрированным слушателям.

SeaTunnel Event Listener Architecture

Поддерживаемые типы событий

Тип события

Описание

JobStartEvent

Вызвано до начала работы

JobSuccessEvent

Вызвано, когда работа преуспевает

JobFailEvent

Вызвано, когда работа не удается

TaskStartEvent

Запускается, когда начинается задача

TaskSuccessEvent

Запускается, когда задача успешно завершается

TaskFailEvent

Запускается, когда задача не выполняется

Эти события позволяют пользователям вставлять логику управления на разных уровнях гранулярности, например, сообщать метрики на уровне задачи или отправлять оповещения на уровне работы.

Как использовать: пример конфигурации

Чтобы включить механизм слушателя событий, вам нужно добавитьevent_listenersКонфигурация в вашем файле конфигурации Seatunnel. Например:

env {
  execution.parallelism = 2
  job.name = "seatunnel-event-listener-demo"
  event_listeners = ["logging"]
}

Эта конфигурация позволяет прослушивателю событий с именемloggingПолем

Примечание:event_listenersэто массив струн, и вы можете настроить несколько слушателей.

Пример встроенного слушателя

Seatunnel в настоящее время включает в себя встроенный прослушитель журнала, который печатает информацию о событиях в журналах-используется для разработки и отладки.

  • Пример вывода:
[INFO] JobStartEvent triggered. Job Name: seatunnel-event-listener-demo  
[INFO] TaskStartEvent triggered. Task: mysql-source->hive-sink  
[INFO] TaskSuccessEvent triggered. Task: mysql-source->hive-sink  
[INFO] JobSuccessEvent triggered.  

Пользовательские слушатели событий

Seatunnel также поддерживает пользовательские слушатели событий, позволяя пользователям расширить логику обработки событий. Шаги реализации следующие:

  1. Реализуйте интерфейс слушателя
public class MyCustomListener implements EventListener {
    @Override
    public void onEvent(Event event) {
        if (event instanceof JobStartEvent) {
            // Send DingTalk or Feishu notifications
        } else if (event instanceof TaskFailEvent) {
            // Write to failure audit table
        }
    }
}
  1. Зарегистрироваться в SPI
    Добавьте путь к классу реализации в файлеMETA-INF/services/org.apache.seatunnel.api.event.EventListener:
com.example.MyCustomListener
  1. Включить его в конфигурацию
env {
  event_listeners = ["my-custom"]
}

Когда Seatunnel начнется, он автоматически загрузит и зарегистрирует вашего слушателя.

Пример вариантов использования

  • Отказ от неудачи: Настройте службы уведомлений Dingtalk или Feishu, чтобы немедленно подтолкнуть оповещения об отказе от операционных команд, когда задача не выполняется.
  • Регистрация аудита: Слушайте события начала и заканчивая работу и запишите ключевую информацию в базу данных аудита для соответствия и отслеживаемости.
  • Запустить вниз по течению рабочих мест: Используйте уведомления HTTP, чтобы запустить нисходящие системы, как только задание будет успешна.
  • Мониторинг продолжительности задачи: Захват временные метки вTaskStartEventиTaskSuccessEventРассчитать и сообщить о продолжительности задачи.

Примечания

  1. Несколько слушателей могут быть активными одновременно; Seatunnel вызовет каждый из них индивидуально.
  2. Если слушатель бросит исключение, это не повлияет на выполнение других слушателей, но ошибка будет зарегистрирована.
  3. Пользовательские слушатели должны быть безопасными и эффективными, чтобы не блокировать основной поток выполнения.

Планы на будущее

Сообщество Seatunnel планирует представить более встроенные плагины слушателя событий-для Dingtalk, Wecom (Enterprise WeChat), Feishu, Prometheus и т. Д.-и для поддержки дополнительных типов событий (таких как проверка данных и отчеты о метрике).

Взносы сообщества приветствуются! Не стесняйтесь отправлять пиар, чтобы помочь построить более мощную экосистему слушателя событий.

Краткое содержание

Механизм слушателя событий привносит большую гибкость и расширяемость в Seatunnel. Он хорошо вписывается в различные сценарии автоматизации, мониторинга и бизнеса. Если вы используете Seatunnel для оркестровки рабочего процесса или интеграции данных, дайте этой функции попытку повысить интеллект и наблюдения вашей платформы.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE