Как эффективно управлять очередями в базах данных SQL

Как эффективно управлять очередями в базах данных SQL

2 марта 2023 г.

Концепция очереди очень проста: есть отправитель и получатель, первый отправляет сообщения, а второй их получает. Но все же семантика подачи имеет решающее значение.

Есть 3 семантических варианта

  • Не более одного раза;
  • Хотя бы один раз;
  • Ровно один раз.

Не более одного раза также известен как "Выстрелил и забыл". Подходит для систем аналитики или отправки уведомлений - где потерянные сообщения не причиняют вреда. Но преимущество такой вещи заключается в том, что производительность поддерживается на более высоком уровне, поскольку нет необходимости отслеживать, получено ли сообщение и даже отправлено ли оно.

At-least-once — здесь в случае ошибки сообщение не помечается как прочитанное и остается таковым до тех пор, пока оно не будет успешно прочитано. Это наиболее распространенная семантика из-за простоты и высокой надежности (сочетается с идемпотентной обработкой сообщений).

Ровно один раз  - когда сообщение будет доставлено (и обработано) не менее 1 раза и не более 1 раза. Этого действительно трудно добиться, и большинство брокеров сообщений не предоставляют такой возможности. Для этого требуется синхронизация между отправителем и получателем и атомарная обработка на стороне получателя. Например, обычная транзакция ACID используется как обработчиком сообщения, так и операцией, которая помечает сообщение как прочитанное.

Типичным вариантом использования решения, описанного ниже, могут быть системы, в которых требуется ровно один раз и хотя бы один раз. ИЛИ это может быть реализация шаблона Transactional Outbox. Другим распространенным явлением является аварийное задание (также известное как Очередь недоставленных писем)  — процесс, в котором важно не потерять сообщения, но не останавливать обработку в случае сбоя.

Идея, которую я реализую ниже, не нова и уже обсуждалась много раз, но я делюсь своим практическим опытом и объясняю различия и компромиссы. Любое решение — это набор компромиссов и в каждом случае стоит опираться на то, к чему вы привыкли. Но если вы решите использовать БД в качестве очереди, то реализация в коде, скорее всего, будет такой же, как я предлагаю в этой статье.

Реализация

Отправитель сообщения

Отправка в очередь в базе данных SQL тривиальна, так как это можно сделать с помощью простой SQL-вставки. В простейшем виде это может выглядеть так:

private final ObjectMapper objectMapper;
private final SimpleJdbcInsert jdbcInsert;

public void send(String topic, T message) {
   try {
       MessageEntity entity = new MessageEntity();
       entity.setTopic(topic);
       entity.setPayload(objectMapper.writeValueAsString(message));
       entity.setCreated(Instant.now().toEpochMilli());
       jdbcInsert.execute(new BeanPropertySqlParameterSource(entity));
   } catch (Exception e) {
       log.error("Unable to send the message {}", message, e);
       throw new RuntimeException(e);
   }
}

Для отправки сообщения нам необходимо следующее:

  • Знайте, куда отправлять — тема.
  • Само сообщение можно сериализовать любым удобным способом. Здесь я выбрал json как удобочитаемый текст, но, возможно, можно использовать двоичный формат для экономии места на диске.
  • Сообщение о дате создания может быть полезно для расследования проблем.

Чтобы использовать запланированные/отложенные сообщения, вы можете использовать дополнительное поле с датой, после которой сообщение может быть обработано. Использование обычной транзакции вместе с бизнес-логикой применит свойства ACID к «отправляющемуся» сообщению.

Получатель сообщений

Общей практикой является использование нескольких экземпляров приложений во избежание единой точки отказа или для масштабирования. Следовательно, нам нужно избегать одновременного доступа к конкретному сообщению. Лучший способ сделать это — использовать замок.

В зависимости от времени, необходимого для обработки сообщения, вы можете выбрать один из двух вариантов:

  1. Заблокировать строку базы данных с помощью SELECT ... FOR UPDATE — работает для коротких транзакций (не дольше пары секунд), потому что эта блокировка работает только в рамках открытой транзакции. Таким образом, сообщение должно быть помечено как выполненное или удалено в той же транзакции.

2. Блокировка на основе значения столбца состояния в базе данных: обновите строку, установив ее IN_PROGRESS. Запустите обработку и после завершения обработки установите статус ГОТОВО или удалите строку. Для этого потребуется несколько транзакций, а промежуточное состояние может быть сброшено из-за перезагрузки сервера или в результате ошибки. Следовательно, блокировка должна иметь срок действия, и нам нужно обновлять данные во время обработки сообщения. Также не стоит забывать про токен ограждения и другие нюансы распределенного локирования.

Ниже я реализовал первый вариант. Это простой, но эффективный способ, который подойдет для подавляющего большинства случаев использования очередей на основе SQL

private static final String SELECT = 
        "SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1";
private static final String DELETE = 
        "DELETE FROM messages WHERE id =:id";

private final DataClassRowMapper<MessageEntity> rowMapper = 
        new DataClassRowMapper<>(MessageEntity.class);

private final String topic;
private final Class<T> messageType;
private final MessageHandler<T> handler;

private final ObjectMapper objectMapper;
private final NamedParameterJdbcOperations jdbcOperations;

@Transactional
public void processMessage() {
   try {
       List<MessageEntity> messages = 
               jdbcOperations.query(SELECT, Map.of("topic", topic), rowMapper);
       if (messages.isEmpty()) {
           return;
       }
       MessageEntity messageEntity = messages.get(0);
       String payload = messageEntity.getPayload();
       T message = objectMapper.readValue(payload, messageType);
       handler.handle(message);
       jdbcOperations.update(DELETE, Map.of("id", messageEntity.getId()));
   } catch (Throwable e) {
       log.error("Unable to handle message of {}", topic, e);
   }
}

Рассмотрим подробнее запрос на получение.

SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1;

Как уже было сказано выше, FOR UPDATE получает блокировку на выбранную строку в рамках текущей транзакции. Конструкция SKIP LOCKED позволяет нам немедленно вернуть одну из разблокированных строк или пустой набор, если все строки заблокированы или ничего не соответствует предложению where.

Намерение состоит в том, чтобы получить только одно сообщение, поэтому мы ограничиваем набор результатов, поэтому транзакция блокирует максимум одно сообщение.

Строка базы данных содержит сериализованные сообщения, поэтому ее следует десериализовать и передать обработчику. После успешной обработки приведенный выше код удаляет строку из базы данных и фиксирует транзакцию. Удаление обработанных сообщений без следа может затруднить расследование. Другой вариант — использовать флаги и отметку времени обработки.

Метод processMessage должен быть зациклен со всеми соответствующими оптимизациями: например, уменьшение количества обработчиков, если очередь пуста, или увеличение числа обработчиков для обработки пиковой нагрузки.

Заключение

Я постоянно повторяю, что использование нестандартных решений не должно стать проблемой для растущего проекта. Главное — оценить нагрузки и подобрать для них возможные решения. Некоторые плюсы и минусы решения перечислены ниже.

Преимущества

  • База данных предоставляет понятный механизм ACID, упрощающий написание и чтение сообщений. Можно не бояться сбоев и перезапусков сервера.
  • Вы всегда можете запросить таблицу, не мешая процессу обработки.
  • Если вы используете очередь в базе данных, вам не нужно знать другие системы и обслуживание отдельных компонентов.
  • Вы можете легко улучшить свои функции, например обработку сообщений с задержкой.

Недостатки

  • Относительно низкая пропускная способность. Можно масштабировать базу данных с помощью реплик только для чтения, но в случае с очередью чтение и запись нужно будет производить именно в мастер-узле. Таким образом, и получатель, и отправитель, и вообще все пользователи БД будут конкурировать за ресурсы, а использование памяти и количество подключений нужно очень четко контролировать.
  • Нагрузка таблицы очереди — состоит из частых вставок и удалений (или мягких удалений), а во время самой обработки строка блокируется, что приводит либо к длительным транзакциям, либо к сложным механизмам блокировки. При использовании облачных решений это может привести к очень высоким затратам на обслуживание базы данных.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE