Кафка — ключ? Эволюция Highlight Ingest
4 декабря 2022 г.В Highlight мы сталкиваемся с уникальной проблемой, когда приходится адаптироваться к большому количеству клиентов. Поскольку мы записываем веб-приложения наших клиентов, нам необходимо иметь возможность обрабатывать всплески трафика, поступающего от их пользователей.
Когда мы только начинали, ввод наших данных был простым: мы передавали данные от нашего записывающего клиента на наш сервер и выполняли обновления по мере их поступления. Наши обработчики данных выполняли обновления метаданных и записывали записи сеансов в PostgreSQL с помощью SQL-запросов, и они потребляться и удаляться из таблиц базы данных для постоянного хранения.
Первоначальный подход некоторое время работал нормально. Но по мере роста нашей клиентской базы мы столкнулись с ключевыми проблемами в этой реализации. Нам пришлось выполнить несколько миграций базы данных, которые запускали команду ALTER с агрессивной блокировкой таблицы. Сразу же мы увидели волну заблокированных команд, ожидающих завершения миграции. В течение примерно 30 минут, которые это заняло, мы были мертвы в воде из-за входящих запросов на сеансы и не могли сохранить ни одну из записей наших клиентов. Время ожидания всего серверного трафика истекло, поскольку у нас не было места для хранения данных в это время.
Сохранение всех записей наших клиентов всегда является нашим главным приоритетом, поэтому мы знали, что должны найти способ улучшить процесс загрузки.
Возможные решения
Поскольку нам нужен был способ буферизации данных, мы рассмотрели различные методы, которые могли бы выполнить эту задачу. Одна из идей заключалась в том, чтобы абстрагировать каждую полезную нагрузку серверной части в «сообщение», которое будет помещено в буфер и обработано исполнителем. У нас было несколько соображений, которые были обусловлены деталями наших данных:
- Данные нашей задачи могут варьироваться от ~10 КБ до ~100 МБ.
- Задачи должны обрабатываться по порядку и последовательно для данной записи.
- Задачи в разных записях можно обрабатывать параллельно.
- Мы не можем потерять сообщения для записи, так как это приведет к аннулированию всей записи.
Apache Cassandra / MongoDB / DynamoDB / Другие базы данных NoSQL
- Есть ограничения на размер строки, которые различаются, но обычно не поддерживают наши сообщения размером ~100 МБ.
- Потребуется гораздо больше логики на уровне приложения для записи и использования данных.
Redis
- Срок действия данных может истечь, поэтому сообщения могут быть потеряны.
- Упорядочение сообщений сложнее поддерживать без ограничений производительности.
RabbitMQ
- Полностью упорядоченный, поэтому масштабирование без изменения архитектуры на уровне приложения может быть проблемой.
- Брокеры повторяются, но плохо масштабируются по горизонтали.
Apache Kafka
- Сообщения могут иметь размер до ~500 МБ.
- Сообщения упорядочены внутри раздела.
- Один потребитель на раздел обеспечивает согласованное упорядочение с линейным параллельным масштабированием.
- Срок действия сообщений истекает только после применения политики хранения на основе времени или пространства.
Создание масштабируемой очереди задач
Оценивая варианты, мы остановились на стратегии обмена сообщениями между производителем и потребителем. В нашей архитектуре наши серверные обработчики HTTP-запросов будут создавать сообщения «задачи», которые будут блокироваться до тех пор, пока данные не будут отправлены выбранному нами брокеру. Сообщения будут быстро поступать к брокеру и оставаться там до тех пор, пока рабочая машина не сможет их принять и обработать их данные. Это эффективно отделяет наш входящий поток данных от обработки, позволяя нам буферизовать данные столько, сколько мы хотим (если позволяет пространство в зависимости от конфигурации нашего брокера).
Учитывая особенности Apache Kafka, нам нужно было определить разделы, в которых будут храниться наборы сообщений. В рамках этого процесса нам нужно было придумать ключ, который позволил бы решить, какие сообщения попадают в один и тот же раздел и упорядочены относительно друг друга. Поскольку одним из наших требований является обеспечение последовательной обработки задач для данной записи, мы выбрали идентификатор сеанса в качестве ключа раздела.
Как на стороне производства, так и на стороне потребления мы могли бы произвольно увеличивать количество параллельных процессов. Поскольку сообщения одной и той же записи обрабатываются по порядку, масштабирование потребителя ограничено. Это основано на количестве разделов, поскольку для сохранения этого свойства у каждого раздела есть только один потребитель.
Например, чтобы иметь возможность масштабирования до 128 потребителей, нам потребуется как минимум 128 разделов. Обратите внимание, что увеличение количества разделов увеличивает нагрузку на узлы брокера Kafka и требует более мощной машины (в первую очередь больше памяти). Для справки: мы легко потребляем/производим ~5 тыс. сообщений в секунду (каждое в среднем ~100 КБ) в 768 разделах кластера AWS kafka.m5.2xlarge с 3 узлами.
Наши 5 основных уроков по Apache Kafka
Управлять нашим кластером Apache Kafka было довольно просто, но в процессе мы извлекли много уроков. Для управления кластером мы устанавливаем конфигурацию на стороне сервера через AWS MSK и отслеживаем/редактируем темы через контейнер ECS, работающий под управлением provectuslabs/. кафка-уи. Для некоторых действий обычно требуется использование интерфейса командной строки Kafka, работающего в VPC кластера, но мы обнаружили, что этот пользовательский интерфейс хорошо справляется с большинством этих задач.
Порядок сообщений
Производители должны отправлять сообщения с настройкой Kafka SDK acks=all, чтобы гарантировать порядок сообщений. Эта настройка означает, что каждый производитель будет отправлять сообщение ВСЕМ брокерам и подождите, пока они все получат сообщение, прежде чем продолжить. Сообщения доставляются на все реплики определенного раздела в одинаковой последовательности, что гарантирует, что потребители этого раздела просматривают данные в идентичном порядке.
Сжатие журнала
Как мы узнали на собственном горьком опыте, включение сжатия журналов плохая идея для брокера сообщений. Сжатие журнала заменит старые сообщения данного ключа будущими сообщениями того же ключа. Это может быть полезно, если сообщения представляют собой состояние, которое обновляется расширенным состоянием, но такой подход приведет к потере данных, если каждое сообщение представляет собой уникальный фрагмент данных.
Коэффициент репликации/минимальная синхронная реплика
Одной из важных функций Apache Kafka является возможность репликации данных между несколькими брокерами для обеспечения высокой доступности. Каждый раздел можно реплицировать между N брокерами, чтобы обеспечить надежное чтение/запись, когда брокер отключен для обслуживания или из-за проблем.
Например, если у вас есть кластер Kafka с тремя узлами и коэффициентом репликации, равным трем, все брокеры будут иметь копии всех данных. Вы можете потерять узел, не беспокоясь о потере данных и с минимальным влиянием на производительность чтения/записи. Коэффициент репликации можно настроить для всего сервера или для каждой темы данных. Дополнительную информацию о том, как это настроить, можно найти здесь.
Настройка min.insync.replicas определяет, сколько брокеров должны подтверждать сообщение до того, как оно будет считаться отправленным. Вы можете увеличить значение этого параметра, чтобы убедиться, что данные реплицируются и упорядочиваются правильно в случае сбоя брокера. Однако будьте осторожны, чтобы не установить значение для общего числа посредников, так как это будет означать, что запись будет заблокирована, если хотя бы один посредник отключен для обслуживания.
Сжатие данных
Если вы думаете о том, как лучше всего сжать данные, у вас есть два варианта: сжать на узлах производителя/потребителя или поручить брокерам Kafka сделать это за вас. Мы выбрали первый вариант сжатия перед отправкой данных и распаковки при извлечении, чтобы снизить нагрузку на наш кластер.
Увеличение количества производителей/потребителей оказалось более рентабельным, чем добавление дополнительных ЦП/брокеров в кластер Kafka. Для этого мы использовали клиентскую библиотеку segmentio/kafka-go, которая обеспечивает превосходную абстракцию для взаимодействия с кластером и позволяет обрабатывать сжатие данных полностью прозрачно.
Тайм-аут перебалансировки
Как мы уже говорили, группа потребителей распределяет разделы между всеми процессами таким образом, чтобы у каждого раздела был ровно один потребитель. Для этого потребители отправляют брокеру тактовые импульсы, чтобы убедиться, что они все еще могут получать сообщения. Когда процесс-потребитель останавливается, например, при развертывании обновления или при сбое контейнера рабочего ECS, Kafka должен обнаружить пропущенное сердцебиение и назначить раздел новому потребителю, чтобы его сообщения не копировались.
Два ключевых параметра определяют конфигурацию ребалансировки. По умолчанию параметр session.timeout.ms регулирует продолжительность без тактов потребителя, когда брокер считает потребителя мертвым и инициирует повторную балансировку, а group.initial.rebalance.delay.ms устанавливает количество времени, в течение которого координатор группы будет ожидать присоединения новых участников. Это означает, что после остановки процесса-потребителя количество времени до использования новых данных равно сумме двух параметров.
Нам пришлось изменить значения этих двух параметров по умолчанию, чтобы гарантировать, что время перебалансировки будет достаточно коротким, чтобы быстро возобновить использование сообщений, и в то же время дать достаточную задержку для запуска новых контейнеров и процессов и подключения к кластеру. р>
Первоначально опубликовано здесь
Оригинал