Как эффективно управлять очередями в базах данных SQL
2 марта 2023 г.Концепция очереди очень проста: есть отправитель и получатель, первый отправляет сообщения, а второй их получает. Но все же семантика подачи имеет решающее значение.
Есть 3 семантических варианта
- Не более одного раза;
- Хотя бы один раз;
- Ровно один раз.
Не более одного раза
также известен как "Выстрелил и забыл". Подходит для систем аналитики или отправки уведомлений - где потерянные сообщения не причиняют вреда. Но преимущество такой вещи заключается в том, что производительность поддерживается на более высоком уровне, поскольку нет необходимости отслеживать, получено ли сообщение и даже отправлено ли оно.
At-least-once
— здесь в случае ошибки сообщение не помечается как прочитанное и остается таковым до тех пор, пока оно не будет успешно прочитано. Это наиболее распространенная семантика из-за простоты и высокой надежности (сочетается с идемпотентной обработкой сообщений).
Ровно один раз
- когда сообщение будет доставлено (и обработано) не менее 1 раза и не более 1 раза. Этого действительно трудно добиться, и большинство брокеров сообщений не предоставляют такой возможности. Для этого требуется синхронизация между отправителем и получателем и атомарная обработка на стороне получателя. Например, обычная транзакция ACID используется как обработчиком сообщения, так и операцией, которая помечает сообщение как прочитанное.
Типичным вариантом использования решения, описанного ниже, могут быть системы, в которых требуется ровно один раз и хотя бы один раз. ИЛИ это может быть реализация шаблона Transactional Outbox. Другим распространенным явлением является аварийное задание (также известное как Очередь недоставленных писем) — процесс, в котором важно не потерять сообщения, но не останавливать обработку в случае сбоя.
Идея, которую я реализую ниже, не нова и уже обсуждалась много раз, но я делюсь своим практическим опытом и объясняю различия и компромиссы. Любое решение — это набор компромиссов и в каждом случае стоит опираться на то, к чему вы привыкли. Но если вы решите использовать БД в качестве очереди, то реализация в коде, скорее всего, будет такой же, как я предлагаю в этой статье.
Реализация
Отправитель сообщения
Отправка в очередь в базе данных SQL тривиальна, так как это можно сделать с помощью простой SQL-вставки. В простейшем виде это может выглядеть так:
private final ObjectMapper objectMapper;
private final SimpleJdbcInsert jdbcInsert;
public void send(String topic, T message) {
try {
MessageEntity entity = new MessageEntity();
entity.setTopic(topic);
entity.setPayload(objectMapper.writeValueAsString(message));
entity.setCreated(Instant.now().toEpochMilli());
jdbcInsert.execute(new BeanPropertySqlParameterSource(entity));
} catch (Exception e) {
log.error("Unable to send the message {}", message, e);
throw new RuntimeException(e);
}
}
Для отправки сообщения нам необходимо следующее:
- Знайте, куда отправлять — тема.
- Само сообщение можно сериализовать любым удобным способом. Здесь я выбрал json как удобочитаемый текст, но, возможно, можно использовать двоичный формат для экономии места на диске.
- Сообщение о дате создания может быть полезно для расследования проблем.
Чтобы использовать запланированные/отложенные сообщения, вы можете использовать дополнительное поле с датой, после которой сообщение может быть обработано. Использование обычной транзакции вместе с бизнес-логикой применит свойства ACID к «отправляющемуся» сообщению.
Получатель сообщений
Общей практикой является использование нескольких экземпляров приложений во избежание единой точки отказа или для масштабирования. Следовательно, нам нужно избегать одновременного доступа к конкретному сообщению. Лучший способ сделать это — использовать замок.
В зависимости от времени, необходимого для обработки сообщения, вы можете выбрать один из двух вариантов:
- Заблокировать строку базы данных с помощью
SELECT ... FOR UPDATE
— работает для коротких транзакций (не дольше пары секунд), потому что эта блокировка работает только в рамках открытой транзакции. Таким образом, сообщение должно быть помечено как выполненное или удалено в той же транзакции.
2. Блокировка на основе значения столбца состояния в базе данных: обновите строку, установив ее IN_PROGRESS. Запустите обработку и после завершения обработки установите статус ГОТОВО или удалите строку. Для этого потребуется несколько транзакций, а промежуточное состояние может быть сброшено из-за перезагрузки сервера или в результате ошибки. Следовательно, блокировка должна иметь срок действия, и нам нужно обновлять данные во время обработки сообщения. Также не стоит забывать про токен ограждения и другие нюансы распределенного локирования.
Ниже я реализовал первый вариант. Это простой, но эффективный способ, который подойдет для подавляющего большинства случаев использования очередей на основе SQL
private static final String SELECT =
"SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1";
private static final String DELETE =
"DELETE FROM messages WHERE id =:id";
private final DataClassRowMapper<MessageEntity> rowMapper =
new DataClassRowMapper<>(MessageEntity.class);
private final String topic;
private final Class<T> messageType;
private final MessageHandler<T> handler;
private final ObjectMapper objectMapper;
private final NamedParameterJdbcOperations jdbcOperations;
@Transactional
public void processMessage() {
try {
List<MessageEntity> messages =
jdbcOperations.query(SELECT, Map.of("topic", topic), rowMapper);
if (messages.isEmpty()) {
return;
}
MessageEntity messageEntity = messages.get(0);
String payload = messageEntity.getPayload();
T message = objectMapper.readValue(payload, messageType);
handler.handle(message);
jdbcOperations.update(DELETE, Map.of("id", messageEntity.getId()));
} catch (Throwable e) {
log.error("Unable to handle message of {}", topic, e);
}
}
Рассмотрим подробнее запрос на получение.
SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1;
Как уже было сказано выше, FOR UPDATE
получает блокировку на выбранную строку в рамках текущей транзакции. Конструкция SKIP LOCKED
позволяет нам немедленно вернуть одну из разблокированных строк или пустой набор, если все строки заблокированы или ничего не соответствует предложению where.
Намерение состоит в том, чтобы получить только одно сообщение, поэтому мы ограничиваем набор результатов, поэтому транзакция блокирует максимум одно сообщение.
Строка базы данных содержит сериализованные сообщения, поэтому ее следует десериализовать и передать обработчику. После успешной обработки приведенный выше код удаляет строку из базы данных и фиксирует транзакцию. Удаление обработанных сообщений без следа может затруднить расследование. Другой вариант — использовать флаги и отметку времени обработки.
Метод processMessage
должен быть зациклен со всеми соответствующими оптимизациями: например, уменьшение количества обработчиков, если очередь пуста, или увеличение числа обработчиков для обработки пиковой нагрузки.
Заключение
Я постоянно повторяю, что использование нестандартных решений не должно стать проблемой для растущего проекта. Главное — оценить нагрузки и подобрать для них возможные решения. Некоторые плюсы и минусы решения перечислены ниже.
Преимущества
- База данных предоставляет понятный механизм ACID, упрощающий написание и чтение сообщений. Можно не бояться сбоев и перезапусков сервера.
- Вы всегда можете запросить таблицу, не мешая процессу обработки.
- Если вы используете очередь в базе данных, вам не нужно знать другие системы и обслуживание отдельных компонентов.
- Вы можете легко улучшить свои функции, например обработку сообщений с задержкой.
Недостатки
- Относительно низкая пропускная способность. Можно масштабировать базу данных с помощью реплик только для чтения, но в случае с очередью чтение и запись нужно будет производить именно в мастер-узле. Таким образом, и получатель, и отправитель, и вообще все пользователи БД будут конкурировать за ресурсы, а использование памяти и количество подключений нужно очень четко контролировать.
- Нагрузка таблицы очереди — состоит из частых вставок и удалений (или мягких удалений), а во время самой обработки строка блокируется, что приводит либо к длительным транзакциям, либо к сложным механизмам блокировки. При использовании облачных решений это может привести к очень высоким затратам на обслуживание базы данных.
Оригинал