Хотите создать автоматические выключатели данных с воздушным потоком? Вот как!

Хотите создать автоматические выключатели данных с воздушным потоком? Вот как!

21 октября 2022 г.

Я большой поклонник Apache Airflow и того, как этот инструмент с открытым исходным кодом позволяет инженерам данных масштабировать конвейеры данных за счет более точного управления рабочими нагрузками.< /p>

Но что происходит, когда тестирование Airflow не выявляет все ваши неверные данные? Что, если «неизвестные неизвестные» проблемы с качеством данных не будут устранены и повлияют на ваши задания Airflow?

Одним из полезных, но малоиспользуемых решений является использование Airflow ShortCircuitOperator для создания автоматических выключателей данных, предотвращающих передачу неверных данных по конвейерам данных.

Автоматические выключатели данных — это мощное средство, но, как и в случае с большинством тактик контроля качества данных, нюансы их реализации имеют решающее значение. В противном случае вы можете усугубить серьезную проблему.

Что такое автоматический выключатель Airflow и как он помогает обеспечить надежность данных?

В электротехнике автоматический выключатель — это защитное устройство, которое защищает ваш дом от повреждений, вызванных перегрузкой по току или коротким замыканием. Когда прерыватель сталкивается с такими электрическими инцидентами, он отключает ток, чтобы предотвратить возникновение еще более серьезной проблемы, например пожара.

Автоматические выключатели данных — это, по сути, тесты данных на стероидах, и философия та же. Когда данные не соответствуют установленным вами пороговым значениям качества или целостности в вашем воздушном потоке chains-part-iv">DAG конвейер остановлен, что предотвращает худший результат, например получение генеральным директором недостоверной информации.

Хотя автоматические выключатели данных чаще всего используются для предотвращения попадания неверных данных на уровень хранения, их можно развертывать на нескольких этапах до обновления информационных панелей BI — например, между этапами преобразования или после выполнения задания ETL или ELT.

Using the Airflow ShortCircuitOperator to create circuit breakers sits within the prevention stage of the data reliability lifecycle. Image courtesy Monte Carlo.

Использование Airflow ShortCircuitOperator для создания автоматических выключателей находится на этапе предотвращения жизненного цикла надежности данных.

Использование Airflow ShortCircuitOperator для создания прерывателя цепи — это тактика, которая находится на этапе предотвращения жизненного цикла надежности данных. И так же, как люди не могут предвидеть и писать тесты для определения всех способов взлома данных, также невозможно установить автоматические выключатели для предотвращения каждого экземпляра неверных данных, которые будут проходить через конвейер (это также нецелесообразно, но подробнее об этом). это позже).

Common use cases and technologies for reducing data downtime. Image courtesy of Monte Carlo.

По этой причине как тестирование данных, так и автоматические выключатели данных лучше всего сокращают время простоя данных. в сочетании с наблюдаемостью данных или сквозным мониторингом данных и оповещением.

Упреждающий мониторинг и оповещение также могут дополнять и помогать преодолевать проблемы благодаря собственным возможностям мониторинга и ведения журналов Apache Airflow в масштабе. В частности, конвейеры Airflow не поддерживают данные. Они выполняют задачи, но не знают, что в них содержится, что требует от вас изучения данных о выполнении, которых редко бывает достаточно для разрешения инцидентов.

This task in the DAG is green, but nothing updated in the underlying table because of the bogus query in example_job_2. Image courtesy of Monte Carlo.

Проблемы автоматического выключателя воздушного потока

Размыкатели цепи, использующие Airflow ShortCircuitOperator, должны быть наиболее важными из ваших тестов по сравнению с базовой операцией запроса и состоять только из наиболее четко определенной логики, которая требует, чтобы ваш конвейер прекратил работу.

Вы также должны использовать автоматические выключатели, только если вы полностью понимаете историю и какие типы инцидентов и пороговые значения представляют собой триггер.

Например, модель данных, не требующая абсолютного отсутствия нулевых столбцов, может быть идеальным средством защиты, но если небольшой диапазон пустых столбцов допустим, это, скорее всего, плохое средство защиты.

Причина этого в том, что оператор Airflow ShortCircuitOperator по своей конструкции вводит время простоя данных, когда автоматический выключатель срабатывает и его необходимо сбросить. Это также происходит в вашем конвейере полностью автоматически — вы не сравниваете результаты в консоли или не смотрите в электронную таблицу.

Это означает, что, хотя AirflowShortCircuitOperator может предотвратить возникновение проблем с данными, он также может нанести ущерб вашему конвейеру, откладывая задания, вызывая цепную реакцию сбоев данных ниже по течению.

Bad data enters even the most well designed data pipelines. https://xkcd.com/2054/

Некоторые рекомендации по снижению этих рисков – итеративное начало работы с менее важными конвейерами и использование преимуществ тестовой среды перед развертыванием в рабочей среде.

Кроме того, убедитесь, что правила — будь то запросы или какая-то пользовательская логика — имеют разумные тайм-ауты. Если для выполнения вашего пакетного задания требуется один час, вероятно, неразумно, чтобы ваш прерыватель цепи выполнял еще один час, так как вы только что удвоили SLA вашего конвейер.

Вы можете сойти с ума от количества тестов данных, которые у вас есть, но прерывайте цепь с Apache ShortCircuitOperator экономно и только на предположениях, которые не могут быть нарушены, чтобы ваши данные считались достоверными.

Спросите себя: «Если этот тест не пройден, хочу ли я, чтобы вся команда по сбору данных немедленно вызвала команду, чтобы решить эту проблему?»

Ладно, хватит предостережения. Давайте построим автоматический выключатель.

Как создать автоматический выключатель с помощью ShortCircuitOperator в группах обеспечения доступности баз данных Airflow

Image 1: Example Airflow ShortCircuitOperator circuit breaker DAG.

На изображении 1 выше у нас есть простая группа обеспечения доступности баз данных с двумя прерывателями цепи always_false и always_true между example_elt_job_1 и example_elt_job_2. Когда данные переходят по каналу always_false_circuit, example_elt_job_2 будет пропущен. Давайте посмотрим на код.

Image 2: Example Airflow circuit breaker code using the Airflow ShortCircuitOperator.

Код на Рисунке 2 очень прост, но он иллюстрирует, где нужно разместить автоматические выключатели в вашем конвейере. Вы бы заменили автоматические выключатели выше своей собственной бизнес-логикой, и у нас есть заполнитель для команды bash.

Вы также можете использовать парадигму API TaskFlow в Airflow 2.X, как показано ниже.

Image 3: An example of a Task Flow API circuit breaker in Python following an extract, load, transform pattern.

Код на изображении 3 извлекает элементы из нашей поддельной базы данных (в долларах) и отправляет их.

Затем мы преобразуем их с помощью переменной, называемой коэффициентом конвертации долларов США в евро, которая в реальном мире, скорее всего, будет введена в результате вызова какого-либо стороннего API, таблицы или другого объекта. Коэффициент конверсии здесь отрицательный, что является очевидной ошибкой и просто невозможно.

Это тип инцидента, для которого вы хотели бы написать автоматический выключатель. Таким образом, код здесь перебирает элементы и проверяет, был ли какой-либо из них отрицательным. Если это условие когда-либо выполняется, цепь размыкается.

Конечно, можно было бы добавить и другие автоматические выключатели с дополнительными порогами. Например, наличие нулевого коэффициента преобразования также, вероятно, потребует автоматического выключателя. Так что же происходит, когда вы начинаете масштабирование и используете несколько автоматических выключателей?

Изображение 4. Использование групп задач Airflow для улучшения управляемости автоматических выключателей в Airflow 2.X.

группы задач Airflow спешат на помощь! С помощью групп задач можно наглядно представить 10 различных автоматических выключателей, и вместо того, чтобы чередовать или анализировать их, с помощью групп задач вы можете просто визуально преобразовать их в одну цепь.

Рекомендации по работе с автоматическим выключателем воздушного потока

У меня есть несколько предложений о том, как добавить автоматические выключатели в ваш конвейер, например:

* Не ограничивайтесь одним типом оператора: вы можете использовать любой оператор, а не только ShortCircuitOperator, для создания пользовательского автоматического выключателя. Вы также можете использовать такие инструменты, как dbt, Great Expectations или Monte Carlo, в качестве прерывателя цепи.

* Попробуйте вызвать AirflowSkipException вместо исключения AirflowException при закрытии канала: это повышает видимость и предотвращает автоматические повторные попытки.

* Не объединяйте несколько каналов в одного оператора. Это затрудняет точное отслеживание того, какая проблема или порог отключили цепь.

* Не объединяйте автоматический выключатель с оператором, выполняющим задачу для оценки. Это затрудняет определение того, произошел ли сбой задания или была отключена цепь.

* Включите механизм обхода или пропуска автоматического выключателя.

* Обобщайте повторяющиеся шаблоны в виде подключаемых модулей или пользовательских операторов.

* Используйте данные хранилища метаданных и журналов в хранилищах и озерах, чтобы создать автоматические выключатели свежести, чтобы увидеть, действительно ли ваша таблица обновлена. В Snowflake: ВЫБЕРИТЕ CONVERT_TIMEZONE( ‘UTC’, last_altered) last_altered FROM information_schema.tables… С Delta Lake: DESCRIBE DETAIL… или через deltaLog.snapshot

Как платформы наблюдения за данными помогают реализовать автоматический выключатель Airflow

Вы можете использовать платформу наблюдения за данными, такую ​​как Monte Carlo, для автоматизации, настройки и упрощения схемы. взлом ваших групп доступности баз данных Airflow или других оркестраторов. Вот как:

  1. Создайте токен API для Монте-Карло. Подробнее см. здесь.

2. Создайте новое HTTP-соединение Airflow с идентификатором API MCD и токеном «extra» в следующем формате:

  1. Установите airflow-mcd из PyPI. Обычно это можно сделать, добавив пакет python в файл requirements.txt для Airflow.

4. Добавьте оператора SimpleCircuitBreakerOperator в свою DAG Airflow. Например:

Этот оператор может использовать любые новые или существующие мониторы правил, созданные вами в Монте-Карло с помощью панели инструментов, API, SDK или IaC (мониторы как код). Ознакомьтесь с полной документацией здесь.

Обратите внимание, что SimpleCircuitBreakerOperator поддерживает несколько параметров, и вы можете найти все подробности здесь. При желании вы также можете использовать SDK Monte Carlo (pycarlo) и/или наш API для отключения цепи.

Не мы подожгли

Оператор Airflow ShortCircuitOperator и автоматические выключатели в целом полезны и просты в реализации, но они являются лишь одним из инструментов в вашем стеке надежности данных. Для максимальной эффективности их необходимо сочетать с другими функциями мониторинга, оповещения, родословной и тестирования.

Сила, которой они обладают, значительна, поэтому будьте осторожны с тем, как вы их реализуете, чтобы они сокращали, а не усугубляли время простоя данных. Но самое главное, получайте удовольствие от того, как вы подключаете автоматические выключатели!

Первоначально опубликовано здесь


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE