Как использовать события, отправленные сервером (SSE) с FastAPI

Как использовать события, отправленные сервером (SSE) с FastAPI

23 февраля 2022 г.

Введение


Отправленные сервером события (SSE) — это способ отправки данных в браузер без перезагрузки страницы. Это позволяет использовать потоковые данные и создавать приложения реального времени, которые можно использовать в различных сценариях.


FastAPI — это среда Python, упрощающая создание API.


В этом руководстве мы будем использовать FastAPI для создания простого сервера SSE, который будет отправлять сообщения каждую секунду.


Предпосылки


Чтобы следовать этому руководству, на вашем компьютере должны быть установлены Python и pip:


https://www.python.org/downloads/


Установка FastAPI


Чтобы установить FastAPI и все его зависимости, вы можете использовать следующую команду:


``` оболочка


pip установить "fastapi [все]"


Это также будет включать сервер uvicorn, который используется для запуска сервера.


Установка sse-starlette


После того, как вы установили FastAPI, вы можете установить расширение sse-starlette, чтобы добавить поддержку SSE в ваш проект FastAPI:


pip установить sse-starlette


Давайте также добавим в наш проект asyncio:


pip установить asyncio


Создание простой конечной точки hello world


После того, как вы установили FastAPI, вы можете создать простую конечную точку hello world, чтобы начать работу.


Создайте новый файл с именем main.py и добавьте следующий код:


```питон


импортировать асинхронный


импортировать увикорн


из fastapi импортировать FastAPI, Запрос


приложение = FastAPI()


@приложение.получить("/")


асинхронный корень определения ():


return {"сообщение": "Привет, мир"}


Запуск сервера uvicorn


Для запуска сервера вы можете использовать следующую команду:


``` оболочка


uvicorn main:app --reload


Это запустит сервер на порту 8000. Флаг --reload автоматически перезагружает сервер, когда вы вносите изменения в код, поэтому вам не нужно перезапускать сервер каждый раз, когда вы вносите изменения.


Зайдите на сервер в своем браузере, и вы должны увидеть следующий вывод:


```json


"message": "Привет, мир"


FastAPI автоматически создаст конечную точку /docs, которая покажет вам документацию по API. Если бы вы посетили /docs, вы бы увидели следующее:


Конечная точка документации FastAPI


Добавление поддержки SSE в ваш проект FastAPI


Далее давайте расширим файл main.py, чтобы добавить поддержку SSE. Для этого вы можете добавить поддержку SSE в свой проект, добавив следующую строку в файл main.py:


```питон


из sse_starlette.sse импортировать EventSourceResponse


Затем вы можете использовать класс EventSourceResponse для создания ответа, который будет отправлять события SSE. Давайте создадим новую конечную точку, которая будет отправлять событие каждую секунду:


```питон


STREAM_DELAY = 1 # секунда


RETRY_TIMEOUT = 15000 # миллисекунд


@app.get('/поток')


async def message_stream (запрос: запрос):


определить новые_сообщения():


Добавьте сюда логику для проверки новых сообщений


выход «Привет, мир»


асинхронное определение event_generator():


пока верно:


Если клиент закрывает соединение, прекратить отправку событий


если ожидать request.is_disconnected():


перерыв


Проверяет наличие новых сообщений и возвращает их клиенту, если они есть


если новые_сообщения():


урожай {


"событие": "новое_сообщение",


"идентификатор": "идентификатор_сообщения",


"повторить": RETRY_TIMEOUT,


"данные": "содержимое_сообщения"


ожидание asyncio.sleep(STREAM_DELAY)


вернуть EventSourceResponse (event_generator())


Теперь, если вы посетите конечную точку /stream в своем браузере, вы увидите событие, отправляемое каждую секунду, без перезагрузки страницы.


FastAPI с потоковыми данными и Materialize


Чтобы узнать больше о потоковой передаче данных, вы можете ознакомиться с этим руководством по использованию FastAPI с Materialize:


Как использовать FastAPI с Materialise для обработки данных в реальном времени


Учебник также включает демонстрационный проект, который вы можете запустить, чтобы понять, как все это работает.


Вот краткая схема проекта:


FastAPI с Materialize


Что такое Материализация?


Materialise — это потоковая база данных, которая берет данные, поступающие из разных источников, таких как Kafka, PostgreSQL, корзины S3 и т. д., и позволяет пользователям создавать представления, которые агрегируют/материализуют эти данные, и позволяет запрашивать эти представления, используя чистый SQL с очень низкой задержкой.


Потоковая передача данных с помощью Materialize


Для демонстрационного проекта мы используем оператор [TAIL](https://materialize.com/docs/sql/tail/#conceptual-framework). TAIL передает обновления из источника, таблицы или представления по мере их появления, что позволяет вам запрашивать данные по мере их обновления и идеально подходит для примера SSE.


Вот код для конечной точки /stream, которая использует TAIL для потоковой передачи данных:


```питон


@app.get('/поток')


async def message_stream (запрос: запрос):


определить новые_сообщения():


Проверяем, есть ли данные в таблице


results = engine.execute('ВЫБЕРИТЕ количество(*) FROM sensor_view_1s')


если results.fetchone()[0] == 0:


возврат Нет


еще:


вернуть Истина


асинхронное определение event_generator():


пока верно:


Если клиент был закрыт, соединение


если ожидать request.is_disconnected():


перерыв


Проверяет наличие новых сообщений и возвращает их клиенту, если они есть


если новые_сообщения():


соединение = engine.raw_connection()


с connection.cursor() как cur:


cur.execute("ОБЪЯВЛЕНИЕ КУРСОРА ДЛЯ ХВОСТА Sensors_view_1s")


cur.execute («ВЫБЕРИТЕ ВСЕ c»)


для строки в cur:


доходность строки


ожидание asyncio.sleep(MESSAGE_STREAM_DELAY)


вернуть EventSourceResponse (event_generator())


Как видите, мы только что расширили функцию new_message, чтобы проверить, есть ли новые сообщения в представлении sensors_view_1s. Если новых сообщений нет, мы вернем None, а EventSourceResponse не будет отправлять никаких событий. Если есть новые сообщения, мы вернем True, и EventSourceResponse отправит новые сообщения.


Затем в асинхронной функции «event_generator» мы используем «TAIL» с оператором «FETCH ALL», чтобы получить все сообщения в представлении «sensors_view_1s». Мы используем оператор DECLARE CURSOR, чтобы создать курсор, который будет передавать данные по мере их обновления.


Заключение


Чтобы узнать больше о FastAPI, проверьте [документации FastAPI] (https://fastapi.tiangolo.com/tutorial/index.html).


Для получения дополнительной информации о том, как использовать FastAPI с Materialise, ознакомьтесь с этим [учебником] (https://devdojo.com/bobbyiliyev/how-to-use-fastapi-with-materialize-for-real-time-data-processing) .


Чтобы узнать больше о Materialise, ознакомьтесь с [документацией по Materialise] (https://materialize.com/docs).


Также опубликовано [Здесь] (https://blog.bobby.sh/how-to-use-server-sent-events-sse-with-fastapi)



Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE