Asyncio: как попрощаться без потери данных

Asyncio: как попрощаться без потери данных

28 марта 2023 г.

Привет народ! Сегодня я хочу проиллюстрировать, насколько запутанным может быть изящное закрытие приложения asyncio без потери чего-либо важного.

При завершении работы приложения asyncio очень важно убедиться, что все запущенные задачи завершили выполнение, прежде чем закрывать приложение. Жесткий выход может привести к потере данных и другим непредвиденным проблемам. Поэтому рекомендуется использовать механизм корректного завершения работы, который позволяет завершить выполнение запущенных задач перед завершением работы приложения.

Для этого модуль asyncio предоставляет функцию shield(), которую можно использовать для обеспечения завершения выполнения задачи, даже если приложение закрывается. Однако в этой статье я покажу, что это не так просто, как может показаться.

1. Без какой-либо защиты

Давайте начнем с простой программы, которая запускает несколько задач в фоновом режиме и ожидает их завершения. Затем я попытаюсь прервать его посередине.

# example1.py

import asyncio


async def worker(n: int) -> None:
    print(f"[{n}] Started!")
    try:
        # this is a task that shouldn't be canceled in the middle
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print(f"[{n}] Canceled (this is bad)!")
    else:
        print(f"[{n}] Successfully done!")


async def main() -> None:
    # create 6 unprotected tasks
    tasks = []
    for n in range(6):
        tasks.append(asyncio.create_task(worker(n)))

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("App was interrupted")
    else:
        print("App was finished gracefully")

Давайте запустим example1.py и прервем его потом:

> python3 example1.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^C[0] Canceled (this is bad)!
[1] Canceled (this is bad)!
[2] Canceled (this is bad)!
[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[5] Canceled (this is bad)!
App was interrupted

Как видите, когда я прервал выполнение скрипта (знак ^C появляется при нажатии ctrl+C), все задачи были немедленно отменены, не дожидаясь их завершения. Но это вполне ожидаемо, так как мы даже не удосужились защитить задачи.

2. Со «щитом»

Давайте обновим main() с помощью asyncio.shield(), чтобы предотвратить отмену 3 из 6 задач, как описано в документация.

# example2.py

import asyncio


async def worker(n: int) -> None:
    ...


async def main() -> None:
    # create 6 tasks, shield only first 3
    tasks = []
    for n in range(6):
        task = asyncio.create_task(worker(n))
        if n < 3:
            # protect task from being canceled 
            # (spoiler: it will be canceled anyway)
            task = asyncio.shield(task)

        tasks.append(task)

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    ...
> python3 example2.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^C[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[5] Canceled (this is bad)!
[2] Canceled (this is bad)!
[0] Canceled (this is bad)!
[1] Canceled (this is bad)!
App was interrupted

Видите ли вы разницу с предыдущим примером (example1.py)? Нет ни одного. Ничего не изменилось. Почему так?

Это связано с тем, что shield() защищает сопрограмму только, если родительская сопрограмма (внутри которой используется shield()) отменяется. Таким образом, это не защищает экранированную сопрограмму от прямой отмены.

Позволь мне объяснить. Если вы закроете приложение, скорее всего, вы сделаете это обычным способом, выполнив следующие действия:

  1. tasks = asyncio.all_tasks() — получить все задачи
  2. [t.cancel() для t в задачах] — отменить все задачи
  3. loop.run_until_complete(gather(*tasks)) — дождаться завершения отмененных задач

Поскольку shield() создает внутреннюю задачу, которая также включается в вызов all_tasks(), она также получает исключение отмены, как и все остальное.

Теперь давайте взглянем на фрагмент кода из asyncio (Python 3.11.2), который вызывается перед закрытием цикла:

def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

Как видим, ничего особенного, все те же три шага.

3. Обработка сигналов

Пришло время защитить задачи от неожиданной отмены. Мы делаем это, реализуя следующие шаги:

  1. Создайте набор задач, которые мы хотим защитить.
  2. Обработка основных сигналов прерывания (SIGHUP, SIGTERM, SIGINT) для реализации собственной логики завершения работы.
  3. В рамках функции выключения отмените только незащищенные задачи.

# example3.py

import asyncio
import signal

# tasks that shouldn't be canceled
_DO_NOT_CANCEL_TASKS: set[asyncio.Task] = set()


def protect(task: asyncio.Task) -> None:
    _DO_NOT_CANCEL_TASKS.add(task)


def shutdown(sig: signal.Signals) -> None:
    print(f"Received exit signal {sig.name}")

    all_tasks = asyncio.all_tasks()
    tasks_to_cancel = all_tasks - _DO_NOT_CANCEL_TASKS

    for task in tasks_to_cancel:
        task.cancel()

    print(f"Cancelled {len(tasks_to_cancel)} out of {len(all_tasks)} tasks")


def setup_signal_handler() -> None:
    loop = asyncio.get_running_loop()

    for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, shutdown, sig)

async def worker(n: int) -> None:
    ...


async def main() -> None:
    # setup graceful shutdown
    setup_signal_handler()

    # protect main task from being canceled, 
        # otherwise it will cancel all other tasks
    protect(asyncio.current_task())

    # create 6 tasks, shield only first 3
    tasks = []
    for n in range(6):
        task = asyncio.create_task(worker(n))
        if n < 3:
            protect(task)

        tasks.append(task)

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    ...
> python3 example3.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^CReceived exit signal SIGINT
Cancelled 3 out of 7 tasks
[5] Canceled (this is bad)!
[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[0] Successfully done!
[1] Successfully done!
[2] Successfully done!
App was finished gracefully

Вот так! Теперь группа защищенных задач не прерывалась посередине.

Однако убедитесь, что задачи, которые вы ожидаете, не являются долгосрочными. В противном случае существует риск принудительного закрытия приложения (SIGKILL).

Заключение

Подводя итог, можно сказать, что когда дело доходит до закрытия приложения asyncio, важно защитить его от возможных непредвиденных проблем. Хотя asyncio предоставляет функцию shield() для обеспечения завершения задачи, одного этого недостаточно для обеспечения корректного завершения работы. Вместо этого необходима настраиваемая логика завершения работы, которая защищает задачи от неожиданной отмены. Обрабатывая сигналы прерывания, вы можете гарантировать, что ваше приложение asyncio будет корректно завершено.

P.S. Весь код, упомянутый в этой статье, можно получить из этого репозитория.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE