Создание распределенной очереди задач в Python с помощью Celery + RabbitMQ + Redis

Создание распределенной очереди задач в Python с помощью Celery + RabbitMQ + Redis

16 февраля 2023 г.

В этой статье мы собираемся использовать Celery, RabbitMQ и Redis для создания распределенной очереди задач. Но что такое распределенная очередь задач и зачем ее создавать?

Распределенная очередь задач позволяет вам переложить работу на другой процесс, чтобы она выполнялась асинхронно (как только вы помещаете работу в очередь, вы не ждете) и параллельно (можно использовать другие ядра для обработки работы).

Таким образом, это в основном дает вам возможность выполнять задачи в фоновом режиме, в то время как приложение продолжает решать другие задачи.

A system using workers with RabbitMQ and Redis

Случаи использования очередей задач

Самый простой и понятный пример – отправка электронных писем после регистрации пользователя. В этом случае вы не знаете, сколько времени потребуется, чтобы отправить электронное письмо пользователю, это может быть 1 мс, но может быть и больше, а иногда даже не будет отправлено вообще, потому что в этих случаях вы не несете ответственности или просто сказали, что не знаете, что задача будет успешно выполнена, потому что за вас это сделает другой провайдер. Итак, теперь, когда вы получили простое представление о том, как вы можете извлечь выгоду из очередей задач, идентифицировать такие задачи так же просто, как проверить, принадлежат ли они к одной из следующих категорий:

* Сторонние задачи — веб-приложение должно быстро обслуживать пользователей, не дожидаясь завершения других действий во время загрузки страницы, например отправки электронного письма или уведомления или распространения обновлений во внутренние инструменты (например, сбор данных). для A/B-тестирования или системного ведения журнала). * Длительные задания — задания, требовательные к ресурсам, когда пользователям приходится ждать, пока они вычислят свои результаты, например выполнение сложного рабочего процесса (рабочие процессы DAG), создание графа, задачи типа Map-Reduce, и обслуживание медиаконтента (видео, аудио). * Периодические задачи. Задания, которые вы запланируете запускать в определенное время или через определенный интервал, например, создание ежемесячного отчета или веб-парсер, который запускается два раза в день.

Настройка зависимостей для Celery

Celery требуется транспорт сообщений для отправки и получения сообщений. Некоторые кандидаты, которых вы можете использовать в качестве брокеров сообщений:

* RabbitMQ * Redis * Amazon SQS

В этом руководстве мы будем использовать RabbitMQ, вы можете использовать любой другой брокер сообщений (например, Redis).

Также хорошо упомянуть, что мы собираемся использовать Redis сейчас, поскольку для транспортера сообщений мы используем RabbitMQ. Когда задачи отправляются брокеру, а затем выполняются celery worker, мы хотим сохранить состояние, а также посмотреть, какие задачи выполнялись до этого. Для этого вам понадобится какое-то хранилище данных, и для этого мы будем использовать Redis.

Для хранилищ результатов у нас также есть много кандидатов:

* AMQP, Редис * Мемкэш, * SQLAlchemy, Django ORM * Apache Cassandra, Elasticsearch, Riak и т. д.

Для настройки этих служб мы будем использовать Docker, поскольку его легко настроить, это изолированная среда, и вы можете легко воспроизвести ту же среду, если у вас есть конфигурация (Dockerfile или docker-compose).

Настройка проекта

Давайте начнем новый проект Python с нуля. Сначала создадим новый каталог, создадим все файлы, необходимые для проекта, а затем инициализируем виртуальную среду.

mkdir celery-python && cd $_
touch __init__.py
touch tasks.py
touch docker-compose.yaml
touch requirements.txt

# create & activate the virtualenv

python -m venv env
source env/bin/activate

Теперь давайте установим проект requirements. Для этого проекта нам понадобятся только celery и Redis.

pip install celery redis 

Теперь пришло время настроить docker-compose для запуска RabbitMQ и Redis. В файл docker-compose.yaml вставьте следующую конфигурацию YAML.

Здесь мы просто запускаем две службы, определяя ключ изображения так, чтобы он указывал на изображение в dockerhub , сопоставляя порты host: docker и добавление переменных среды. Чтобы узнать, какие типы переменных среды вы можете использовать с вашим образом, вы можете просто перейти к соответствующему образу в dockerhub и просмотреть документацию. Например, вы можете узнать, как использовать образ RabbitMQ здесь.

Теперь давайте инициализируем приложение celery для использования RabbitMQ в качестве транспортера сообщений и Redis в качестве хранилища результатов. В файл tasks.py добавим следующий код:

Я старался сделать код как можно минимальным, чтобы вы могли понять цель этого руководства. Как видите, мы определили URL-адреса для RabbitMQ и Redis, а затем просто инициализируем приложение celery, используя эти конфигурации. Первым параметром задачи является имя текущего модуля.

Затем мы украсили функцию say_hello атрибутом @app.task, который говорит о том, что функция помечена как задача, а затем может быть вызвана с помощью .delay(), что мы вскоре увидим.

Обычно у нас был бы модуль celery_app.py только для инициализации экземпляра приложения celery, а затем отдельный модуль moduletasks.py, в котором мы определяли бы задачи, которые мы хотим запускать с помощью celery.< /сильный>


Создание и запуск сервисов с помощью Docker

Теперь нам нужно только запустить службы (RabbitMQ и Redis) с помощью docker. Чтобы запустить изображения внутри контейнера, мы просто запускаем:

docker-compose up -d 

Это займет некоторое время, если у вас нет этих изображений локально. Затем, чтобы убедиться, что контейнеры запущены и работают, мы пишем:

docker ps

И вы должны увидеть две запущенные службы и дополнительную информацию о каждой из них, если не проверить журналы на наличие возможных ошибок. Теперь давайте запустим celery worker, а затем попробуем запустить некоторые задачи с интерактивной оболочкой python.

# Starting the Celery worker

celery -A tasks worker -l info --pool=solo

Это запустит celery worker, и если вы увидите журналы, он должен сказать вам, что он успешно соединился с брокером. Теперь давайте запустим задачу:

# Running celery tasks

python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> say_hello.delay("Valon")
<AsyncResult: 55ad96a9-f7ea-44f4-9a47-e15b90d6d8a2>

Мы видим, что мы вызвали функцию, используя .delay(), а затем передали аргумент имени. Этот метод на самом деле представляет собой ярлык со звездочкой для другого метода с именем apply_async(). Затем мы видим, что мы получаем обратно <AsyncResult, который представляет собой задачу, которая была передана брокеру, и после этого celery будет обработан и завершен в фоновом режиме.

Если вы сейчас посмотрите на своего воркера, то увидите в логах, что воркер получил задачу, а затем через 5 секунд сообщит вам, что задача успешно завершена.

Теперь запустим ту же задачу, но поместим хранилище результатов в игру. сейчас. В оболочке python давайте сохраним результат в переменной, а затем запустим ее свойства.

Если бы серверная часть не была настроена в celery (Redis), мы не смогли бы получить доступ к этим свойствам или функциям, потому что по умолчанию она не хранила бы никакого состояния, но поскольку она у нас есть, мы можем видеть и получить фрагменты информации о наших задачах. Если вы хотите копнуть глубже, вы можете получить доступ к своей базе данных Redis с помощью такого инструмента, как TablePlus, или вы можете настроить Flower для мониторинга Redis и КроликMQ.

Как видно на изображении выше, все задачи хранятся в Redis.

Подведение итогов

В этой статье мы создали приложение Python с Celery, RabbitMQ и Redis с нуля. Цель статьи состояла в том, чтобы показать вам, что такое очередь задач, что мы можем из нее извлечь и как ее реализовать. Примеры задачи приведены только для демонстрации, но вы можете использовать ту же конфигурацию, что и я, добавляя задачи в модуль tasks и конфигурацию в celery_app.py. См. документацию здесь.

Я настоятельно рекомендую вам использовать сельдерей в своих приложениях, так как он весьма полезен, когда у вас есть дела, которые требуют больше времени, вам нужно планировать задачи и т. д.

Если вы прочитали статью и нашли ее полезной, не забудьте поставить лайк и поделиться ею с друзьями и друзьями. сообщество. Если у вас есть какие-либо вопросы, не стесняйтесь обращаться ко мне. Свяжитесь со мной в 👉 LinkedIn, Гитхаб

:::информация Также опубликовано здесь.

:::


Оригинал