Как использовать события, отправленные сервером (SSE) с FastAPI
23 февраля 2022 г.Введение
Отправленные сервером события (SSE) — это способ отправки данных в браузер без перезагрузки страницы. Это позволяет использовать потоковые данные и создавать приложения реального времени, которые можно использовать в различных сценариях.
FastAPI — это среда Python, упрощающая создание API.
В этом руководстве мы будем использовать FastAPI для создания простого сервера SSE, который будет отправлять сообщения каждую секунду.
Предпосылки
Чтобы следовать этому руководству, на вашем компьютере должны быть установлены Python и pip:
https://www.python.org/downloads/
Установка FastAPI
Чтобы установить FastAPI и все его зависимости, вы можете использовать следующую команду:
``` оболочка
pip установить "fastapi [все]"
Это также будет включать сервер uvicorn, который используется для запуска сервера.
Установка sse-starlette
После того, как вы установили FastAPI, вы можете установить расширение sse-starlette
, чтобы добавить поддержку SSE в ваш проект FastAPI:
pip установить sse-starlette
Давайте также добавим в наш проект asyncio
:
pip установить asyncio
Создание простой конечной точки hello world
После того, как вы установили FastAPI, вы можете создать простую конечную точку hello world, чтобы начать работу.
Создайте новый файл с именем main.py
и добавьте следующий код:
```питон
импортировать асинхронный
импортировать увикорн
из fastapi импортировать FastAPI, Запрос
приложение = FastAPI()
@приложение.получить("/")
асинхронный корень определения ():
return {"сообщение": "Привет, мир"}
Запуск сервера uvicorn
Для запуска сервера вы можете использовать следующую команду:
``` оболочка
uvicorn main:app --reload
Это запустит сервер на порту 8000
. Флаг --reload
автоматически перезагружает сервер, когда вы вносите изменения в код, поэтому вам не нужно перезапускать сервер каждый раз, когда вы вносите изменения.
Зайдите на сервер в своем браузере, и вы должны увидеть следующий вывод:
```json
"message": "Привет, мир"
FastAPI автоматически создаст конечную точку /docs
, которая покажет вам документацию по API. Если бы вы посетили /docs
, вы бы увидели следующее:
Добавление поддержки SSE в ваш проект FastAPI
Далее давайте расширим файл main.py, чтобы добавить поддержку SSE. Для этого вы можете добавить поддержку SSE в свой проект, добавив следующую строку в файл main.py:
```питон
из sse_starlette.sse импортировать EventSourceResponse
Затем вы можете использовать класс EventSourceResponse для создания ответа, который будет отправлять события SSE. Давайте создадим новую конечную точку, которая будет отправлять событие каждую секунду:
```питон
STREAM_DELAY = 1 # секунда
RETRY_TIMEOUT = 15000 # миллисекунд
@app.get('/поток')
async def message_stream (запрос: запрос):
определить новые_сообщения():
Добавьте сюда логику для проверки новых сообщений
выход «Привет, мир»
асинхронное определение event_generator():
пока верно:
Если клиент закрывает соединение, прекратить отправку событий
если ожидать request.is_disconnected():
перерыв
Проверяет наличие новых сообщений и возвращает их клиенту, если они есть
если новые_сообщения():
урожай {
"событие": "новое_сообщение",
"идентификатор": "идентификатор_сообщения",
"повторить": RETRY_TIMEOUT,
"данные": "содержимое_сообщения"
ожидание asyncio.sleep(STREAM_DELAY)
вернуть EventSourceResponse (event_generator())
Теперь, если вы посетите конечную точку /stream
в своем браузере, вы увидите событие, отправляемое каждую секунду, без перезагрузки страницы.
FastAPI с потоковыми данными и Materialize
Чтобы узнать больше о потоковой передаче данных, вы можете ознакомиться с этим руководством по использованию FastAPI с Materialize:
Как использовать FastAPI с Materialise для обработки данных в реальном времени
Учебник также включает демонстрационный проект, который вы можете запустить, чтобы понять, как все это работает.
Вот краткая схема проекта:
Что такое Материализация?
Materialise — это потоковая база данных, которая берет данные, поступающие из разных источников, таких как Kafka, PostgreSQL, корзины S3 и т. д., и позволяет пользователям создавать представления, которые агрегируют/материализуют эти данные, и позволяет запрашивать эти представления, используя чистый SQL с очень низкой задержкой.
Потоковая передача данных с помощью Materialize
Для демонстрационного проекта мы используем оператор [TAIL](https://materialize.com/docs/sql/tail/#conceptual-framework)
. TAIL
передает обновления из источника, таблицы или представления по мере их появления, что позволяет вам запрашивать данные по мере их обновления и идеально подходит для примера SSE.
Вот код для конечной точки /stream
, которая использует TAIL
для потоковой передачи данных:
```питон
@app.get('/поток')
async def message_stream (запрос: запрос):
определить новые_сообщения():
Проверяем, есть ли данные в таблице
results = engine.execute('ВЫБЕРИТЕ количество(*) FROM sensor_view_1s')
если results.fetchone()[0] == 0:
возврат Нет
еще:
вернуть Истина
асинхронное определение event_generator():
пока верно:
Если клиент был закрыт, соединение
если ожидать request.is_disconnected():
перерыв
Проверяет наличие новых сообщений и возвращает их клиенту, если они есть
если новые_сообщения():
соединение = engine.raw_connection()
с connection.cursor() как cur:
cur.execute("ОБЪЯВЛЕНИЕ КУРСОРА ДЛЯ ХВОСТА Sensors_view_1s")
cur.execute («ВЫБЕРИТЕ ВСЕ c»)
для строки в cur:
доходность строки
ожидание asyncio.sleep(MESSAGE_STREAM_DELAY)
вернуть EventSourceResponse (event_generator())
Как видите, мы только что расширили функцию new_message
, чтобы проверить, есть ли новые сообщения в представлении sensors_view_1s
. Если новых сообщений нет, мы вернем None, а EventSourceResponse не будет отправлять никаких событий. Если есть новые сообщения, мы вернем True, и EventSourceResponse отправит новые сообщения.
Затем в асинхронной функции «event_generator» мы используем «TAIL» с оператором «FETCH ALL», чтобы получить все сообщения в представлении «sensors_view_1s». Мы используем оператор DECLARE CURSOR
, чтобы создать курсор, который будет передавать данные по мере их обновления.
Заключение
Чтобы узнать больше о FastAPI, проверьте [документации FastAPI] (https://fastapi.tiangolo.com/tutorial/index.html).
Для получения дополнительной информации о том, как использовать FastAPI с Materialise, ознакомьтесь с этим [учебником] (https://devdojo.com/bobbyiliyev/how-to-use-fastapi-with-materialize-for-real-time-data-processing) .
Чтобы узнать больше о Materialise, ознакомьтесь с [документацией по Materialise] (https://materialize.com/docs).
Также опубликовано [Здесь] (https://blog.bobby.sh/how-to-use-server-sent-events-sse-with-fastapi)
Оригинал