Как создать масштабируемую систему доставки веб-перехватчиков с помощью Kafka, SQS и S3
1 июня 2022 г.Поскольку мир становится все более и более связанным, программные приложения, конечно же, не являются исключением. Приложениям нужен способ, чтобы 2 или более полностью разделенных систем обменивались информацией.
Веб-перехватчики — это автоматические сообщения, отправляемые приложениями, когда что-то происходит. У них есть сообщение (или полезная нагрузка), и они отправляются на уникальный URL-адрес. Веб-перехватчики почти всегда быстрее, чем опрос, что делает их идеальным решением для передачи событий из системы во внешний мир.
Они широко используются такими гигантами индустрии, как Shopify, Stripe, Twitter и Twilio. Если вы посмотрите на PayPal, веб-хуки — это то, как PayPal сообщает вашему приложению электронной коммерции, что ваши клиенты заплатили вам.
В этом посте я представлю решение системы доставки веб-перехватчиков с использованием Apache Kafka, AWS SQS и S3. В прошлом у Yotpo были разные реализации для отправки событий через веб-перехватчики в разных продуктах. Мы решили объединить их в единое унифицированное решение.
Итак, если вы планируете создать собственную систему доставки веб-перехватчиков, этот блог для вас!
Определение требований
Давайте посмотрим, что мы хотели получить от новой системы доставки вебхуков и почему:
- Масштабируемость — система должна быть способна адаптироваться к изменениям и различным нагрузкам, используя горизонтальную масштабируемость.
- Retry Support — отправка веб-перехватчиков иногда может завершиться ошибкой из-за проблемы на принимающей стороне (ошибка, сбой системы и т. д.). Мы могли бы просто отбросить неудачные сообщения, но это привело бы к потере данных для получателя. Нам нужен механизм для повторной попытки операции с экспоненциальной задержкой и в течение 24 часов.
- Поддержка большой полезной нагрузки — события веб-перехватчика могут иметь большую полезную нагрузку в несколько МБ.
- Payload Agnostic — знание смысла содержания сообщения не требуется (и даже невозможно). Полезная нагрузка сообщения доставляется как есть. Это крайне важно для того, чтобы иметь одно решение для множества различных вариантов использования в бизнесе.
- Дедупликация доставки — все события веб-перехватчиков будут доставлены по назначению, но, возможно, более одного раза. У нас будет уникальный идентификатор события для целей дедупликации.
- Public REST API — необходим для регистрации вебхуков
Мы позволили себе некоторую снисходительность:
- Отсутствие гарантии заказа — мы не гарантируем порядок событий веб-перехватчика.
- Обрабатывать только связь HTTP/S — отправка веб-перехватчиков любым другим способом связи (например, Apache Kafka, TCP и т. д.) не будет поддерживаться.
Создание функциональности
С самого начала нам было ясно, что наша система будет иметь 2 основных уровня: API (используется для конфигураций) и Brain (фактическая отправка событий веб-перехватчика). Рассмотрим каждый из них по отдельности…
Уровень API
Используется для регистрации и настройки вебхуков. Мы решили выбрать самое простое решение и использовать REST API. API может использоваться конечным пользователем (подписчиком веб-перехватчика) или другим приложением.
URL-адрес конечных точек структурирован как [базовый путь]/webhooks/<entity>
Определено 3 типа объектов:
1. Цель веб-перехватчика — объект, который инкапсулирует целевой URL-адрес веб-перехватчика.
```javascript
"id": 111111111,
"url": "http://www.dummy.com",
"createdAt": "2021-10-03T17:14:23Z",
"updatedAt": "2021-10-03T17:14:23Z"
2. Webhook Filter — объект, содержащий список типов событий, которые пользователь хочет прослушать (по сути, группа событий).
```javascript
"id": 222222222,
"События": [
"core.orders.created.v1",
"core.orders.updated.v2"
"createdAt": "2021-10-03T17:14:23Z",
"updatedAt": "2021-10-03T17:14:23Z"
3. Подписка на веб-перехватчик — объект, который содержит комбинацию целевого веб-перехватчика и фильтра веб-перехватчика. События Webhooks будут отправляться только активным подпискам.
```javascript
"id": 123456789,
"целевой идентификатор": 111111111,
"идентификатор фильтра": 222222222,
«активный»: правда,
"createdAt": "2021-10-03T17:14:23Z",
"updatedAt": "2021-10-03T17:14:23Z"
Каждая сущность поддерживает следующие операции:
- ПОЛУЧИТЬ ПО ID (ПОЛУЧИТЬ
/webhooks/<entity>/<entity_id>
)
- ПОЛУЧИТЬ ВСЕ (ПОЛУЧИТЬ
/webhooks/<entity>
)
- СОЗДАТЬ (POST
/webhooks/<entity>
)
- ОБНОВЛЕНИЕ (ИСПРАВЛЕНИЕ
/webhooks/<entity>/<entity_id>
)
- УДАЛИТЬ (УДАЛИТЬ
/webhooks/<entity>/<entity_id>
)
Чтобы создать вебхук, нужно выполнить 3 шага:
- Создайте Цель веб-перехватчика, которая будет указывать на место, куда вы хотите отправить сообщение веб-перехватчика.
- Создайте Фильтр веб-перехватчиков, который сообщит, какие события следует отправлять в виде сообщений веб-перехватчиков.
- Создайте подписку на веб-перехватчик, чтобы связать цель и фильтр. Только когда этот шаг будет завершен, фактический веб-перехватчик сообщения будут отправлены
Уровень мозга
Уровень, который будет отвечать за фактическую отправку веб-перехватчиков. Целесообразно разбить его на еще более мелкие части, что позволит нам масштабировать различные области системы независимо от других областей, которые не нуждаются в таком же масштабировании.
Во-первых, давайте определимся с простой терминологией:
- Webhook Event — событие, полученное от других приложений.
- Доставка веб-перехватчика — комбинация цели веб-перехватчика и события веб-перехватчика. Поскольку каждое событие может заинтересовать несколько целей, оно дублируется для каждой цели.
- Webhook Message — фактические данные, отправленные на целевой URL.
Потребитель
Потребитель обрабатывает события веб-перехватчиков от Kafka и решает (на основе данных о подписке), какие цели веб-перехватчиков применимы. Приняв это решение, он создает Webhook Delivery для каждой цели, которой необходимо получить событие, и публикует ее в очереди AWS SQS.
Поскольку SQS имеет ограничение в 256 КБ на сообщение, а у нас есть сообщения большего размера, мы сохраняем полезную нагрузку сообщения в S3 для будущего извлечения.
Кэш
События происходят в системе очень часто, и для каждого из них нам нужно сопоставить все соответствующие подписки, определенные в БД данных подписок. Чтобы не нагружать БД, мы держим в памяти локальный кеш с конфигурацией подписки.
Диспетчер
Диспетчер — это часть системы, которая выполняет фактическую отправку полезной нагрузки события в виде сообщения веб-перехватчика на соответствующий URL-адрес. Он получает доставку, созданную потребителем, из очереди AWS SQS и выполняет вызов HTTP POST на нужный URL-адрес с полезной нагрузкой доставки. Это отдельный модуль, позволяющий решать проблемы, влияющие на производительность, но об этом позже.
Менеджер повторных попыток
Как упоминалось в требованиях, отправка веб-перехватчиков может завершиться ошибкой, поэтому у нас должен быть механизм для повторной попытки операции (экспоненциальная отсрочка в течение 24 часов). Не сумев найти существующее решение, отвечающее нашим требованиям, мы решили реализовать его самостоятельно, используя возможности отложенных сообщений SQS и S3 для хранения полезной нагрузки. Для более глубокого погружения в эту тему, пожалуйста, обратитесь к моему следующему сообщению в блоге (ссылка в конце этого сообщения).
Собираем все вместе
Системный поток
1. С помощью API определяется новый веб-перехватчик (цель, фильтр и подписка) и сохраняется в базе данных данных подписки. Это делается один раз, и с этого момента соответствующие события будут отправляться в виде сообщений веб-перехватчика.
2. Как только что-то происходит в системе, использующей систему доставки веб-перехватчиков (обозначенную как App1 на диаграмме 2), она должна отправить сообщение в специальную тему Kafka с полезной нагрузкой события и типом события. Вот схема такого сообщения:
```javascript
"тип": "запись",
"имя": "Вебхуксобытие",
"namespace": "com.yotpo.platform.webhookdelivery.messages",
"поля": [
"имя": "тип_события",
"тип": "строка"
"имя": "полезная нагрузка",
"тип": "строка"
3. Потребитель опрашивает сообщение из темы Kafka и использует определения, которые были сделаны ранее в API и сохранены в локальном кеше. Затем он решает, существует ли подписка, использующая фильтр, в котором определен текущий тип события. Если это так, он собирает все совпадающие подписки и создает сообщение о доставке для всех определенных в них целей. Созданные доставки отправляются в очередь SQS.
4. Диспетчер опрашивает доставки веб-перехватчиков из очереди SQS и отправляет их адресату. Если диспетчер получает ссылку на S3 для полезной нагрузки, он извлекает ее перед отправкой доставки по URL-адресу.
5. В случае неудачной отправки доставка возвращается менеджеру повторных попыток, который отвечает за повторную отправку ее диспетчеру. Это будет продолжаться до тех пор, пока доставка не будет успешной или пока не будет исчерпано количество попыток.
Последние мысли
В этой статье затрагиваются только такие темы, как то, как мы решали проблемы, связанные с реальной жизнью (например, управление повторными попытками) или производительностью (кэширование и операции ввода-вывода).
Кроме того, очень интересными и сложными темами, которые следует рассмотреть, является использование в многопользовательской системе SaaS. Есть и другие вопросы, которые следует учитывать при интеграции с такой системой. Такие проблемы, как голодание и помехи производительности, безопасность (CRC, проверка заголовка подписи), разделение данных и многое другое.
По всем этим вопросам я приглашаю вас ознакомиться с моей следующей публикацией — * Решение проблем системы доставки веб-перехватчиков в многопользовательской системе SaaS *.
Также опубликовано [Здесь] (https://medium.com/yotpoengineering/building-a-scalable-webhook-delivery-system-using-kafka-sqs-s3-e9808eaa8a04)
Оригинал