Использование потоков Redis с NestJS: часть 3 — группы потребителей
23 февраля 2023 г.Введение
Это третья часть серии из трех частей, в которой мы рассмотрим, как использовать потоки Redis с NestJS.
Он состоит из 3 частей:
- Настройка приложения NestJS и подключение к Redis
- Заполнение потоков Redis и чтение из них в режиме разветвления
- Использование групп потребителей для обработки одного потока от нескольких участников таким образом, что одно сообщение отправляется и обрабатывается только одним участником (потребителем)
К концу этой серии вы будете обладать знаниями и инструментами, необходимыми для создания собственного приложения NestJS, использующего потоки Redis для обработки данных в реальном времени.
Полный код доступен на github
.Что у нас есть
В предыдущих частях мы создали приложение NestJS и подключили его к серверу Redis. Затем мы реализовали функциональность для добавления сообщений в поток Redis и чтения из него в разветвленном виде. Наконец, мы рассмотрели, как можно использовать структуру данных потока Redis для обработки потока данных в реальном времени путем непрерывной выборки сообщений из потока Redis.
В этом посте мы будем развивать эту идею, используя группы потребителей, тем самым добавляя возможность распределенной обработки данных.
Понимание групп потребителей
В Redis группы потребителей — это способ распределения сообщений или задач между несколькими потребителями, при этом каждое сообщение обрабатывается только один раз. Когда у вас есть много сообщений для обработки, одному потребителю может потребоваться помощь, чтобы не отставать от скорости входящих сообщений. В этом сценарии вы можете создать несколько экземпляров потребителей, чтобы разделить рабочую нагрузку.
Группы потребителей Redis позволяют распространять сообщения нескольким потребителям скоординированным и отказоустойчивым способом. Каждому потребителю в группе назначается уникальное имя и подмножество сообщений потока для обработки. Redis использует механизм, называемый «подтверждением сообщения», чтобы гарантировать, что каждое сообщение обрабатывается только один раз, даже если в группе несколько потребителей.
Использование групп потребителей
Мы использовали команду XREAD
для извлечения данных из потока. Если мы хотим использовать группы потребителей, нам нужно будет использовать другую команду — XREADGROUP
. Общую информацию можно найти в документах Redis
Чтение как потребитель
Давайте создадим метод в RedisService
для чтения данных в режиме группы потребителей:
// redis.service.ts
public async readConsumerGroup({
streamName,
group,
consumer,
blockMs,
count,
}: CosnumeStreamParams): Promise<RedisStreamMessage[] | null> {
let response: RedsXReadGroupResponse = null;
try {
response = await this.redis.xReadGroup(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
group,
consumer,
{
key: streamName,
id: '>',
},
{ BLOCK: blockMs, COUNT: count },
);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
if (error.message.includes('NOGROUP')) {
console.log(`${error.message} ...CREATING GROUP`);
await this.createConsumerGroup(streamName, group);
return null;
}
console.error(
`Failed to xReadGroup from Redis stream: ${error.message}`,
error,
);
return null;
}
const messages = response?.[0]?.messages; // returning first stream (since only 1 stream used)
return messages || null;
}
Мы представили новый интерфейс для параметров CosnumeStreamParams
. Поскольку он во многом похож на ReadStreamParams
, мы извлекли их в базовый интерфейс, StreamParamsBase
, и расширили их из обоих этих интерфейсов.
// interfaces.ts
export interface StreamParamsBase {
/** Name of stream to read from */
streamName: string;
/** Max time in ms for how long to block Redis connection before returning
* If 0 is passed, it will block until at least one message is fetched, or timeout happens
* */
blockMs: number;
/** Max how many messages to fetch at a time from Redis */
count: number;
}
export interface ReadStreamParams extends StreamParamsBase {
/** ID of last fetched message */
lastMessageId: string;
}
export interface CosnumeStreamParams extends StreamParamsBase {
/** Name of consumer group */
group: string;
/** Name of consumer, must be unique within group */
consumer: string;
}
// --snip--
Результирующее потоковое сообщение - RedisStreamMessage
остается таким же, как и при чтении потока. Точно так же, как и для XREAD
, мы извлекли тип ответа RedsXReadGroupResponse< /код>
// redis-client.type.ts
export type RedsXReadGroupResponse = Awaited<
ReturnType<RedisClient['xReadGroup']>
Код должен быть простым:
// redis.service.ts
// --snip--
let response: RedsXReadGroupResponse = null;
try {
response = await this.redis.xReadGroup(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
group,
consumer,
{
key: streamName,
id: '>',
},
{ BLOCK: blockMs, COUNT: count },
);
}
// --snip--
Мы вызываем метод xReadGroup
клиента node-redis
и оборачиваем его в try-catch для обработки любой ошибки, которую мы можем получить от RedisClient
. р>
Как и в случае с xRead, при использовании блокирующих команд мы хотим использовать пул соединений и выполнять эти команды изолированно с помощью commandOptions({solid: true})
Затем мы передаем имя потребителя group
, к которому принадлежит этот потребитель и с которым он обменивается сообщениями.
Далее нам нужно идентифицировать потребителя
— сущность, читающую это сообщение.
Далее мы указываем, из какого потока мы хотим получить поток с помощью параметра key
и начиная с какого идентификатора сообщения. >
— это специальный символ, означающий, что мы хотим получить только те сообщения, которые никогда не были доставлены другому потребителю. Если бы мы использовали 0 или любой другой действительный идентификатор, мы рискуем также получить сообщения, которые были переданы другим пользователям, но еще не были подтверждены.
:::подсказка Здесь мы указываем только 1 поток. Однако мы могли бы читать из нескольких потоков, добавив массив объектов, содержащих значения ключа и идентификатора.
:::
Наконец, мы устанавливаем параметры чтения. BLOCK
для максимального времени блокировки в мс и COUNT
для максимального количества сообщений. Они работают точно так же, как и с опцией XREAD
.
Далее нам нужно обработать ошибки:
// redis.service.ts
// --snip--
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
if (error.message.includes('NOGROUP')) {
console.log(`${error.message} ...CREATING GROUP`);
await this.createConsumerGroup(streamName, group);
return null;
}
console.error(
`Failed to xReadGroup from Redis stream: ${error.message}`,
error,
);
return null;
}
// --snip--
Как и в случае с другими вызовами Redis, мы проверяем закрытое соединение — ClientClosedError
и в этом случае пытаемся переподключиться. Затем мы проверяем наличие ошибки с сообщением, включающим NOGROUP
.
Полная ошибка может выглядеть примерно так: NOGROUP Нет такого ключа «example-stream» или группы потребителей «example-group» в XREADGROUP с параметром GROUP
, но если мы видим NOGROUP
, мы знаем, что в этом потоке нет группы, частью которой мы хотим, чтобы наши потребители были.
Итак, нам нужно создать эту группу.: await this.createConsumerGroup(streamName, group);
// redis.service.ts
private async createConsumerGroup(streamName: string, group: string) {
try {
await this.redis.xGroupCreate(
streamName,
group,
'0', // use 0 to create group from the beginning of the stream, use '$' to create group from the end of the stream
{
MKSTREAM: true,
},
);
} catch (error) {
if (error.message.includes('BUSYGROUP')) {
// Consumer group already exists
return;
}
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xGroupCreate: ${error.message}`);
return null;
}
}
Мы вызываем команду XGROUP CREATE
, указав streamName
, имя group
, которое мы хотим создать. Мы также можем выбрать начальную позицию, с которой мы хотим начать потреблять сообщения. Здесь установлено значение 0
, что означает последнее сообщение в потоке. Это может быть любой действительный идентификатор или специальный символ $
— это приведет к тому, что сообщения, добавленные только после создания группы, начнут потребляться.
Мы также устанавливаем для параметра MKSTREAM
значение true, поэтому, если у нас еще нет этого потока по какой-либо причине, например, сообщения не были добавлены, Redis перезапускается/переключается, или в случае плохого управления памятью, поток вытеснен, мы создадим новый поток с этим именем (а также добавим группу потребителей).
Мы добавили обработку ошибок для BUSYGROUP
— это означает, что эта группа (и поток) все-таки существует (например, если она была только что создана другим потребителем группы). Остальная часть обработки ошибок остается такой же, как для других вызовов Redis.
Наконец, возвращаясь к readConsumerGroup
, после получения сообщений и обработки ошибок мы извлекаем сообщения первого потока (поскольку мы добавили только один key
и id
объект) и вернуть их.
// redis.service.ts
// --snip--
const messages = response?.[0]?.messages; // returning first stream (since only 1 stream used)
return messages || null;
// --snip--
Подтверждение сообщения
Группы потребителей — это отличный способ распространять потоковые данные между разными потребителями только один раз для каждой группы. Но как гарантировать, что сообщение, отправленное сервером Redis, действительно было получено и обработано потребителем, которому оно было отправлено? Ответ "подтверждение".
Когда сообщение отправляется потребителю, в качестве побочного эффекта Redis помещает идентификатор сообщения и этого потребителя в список ожидающих записей (PEL) этой группы потребителей потока.
Потребитель должен отправить подтверждение (также известное как «подтверждение») на сервер Redis, чтобы сигнализировать об успешной обработке сообщения. Это подтверждение отправляется с помощью команды XACK
, которая принимает имя группы потребителей, имя потока и идентификаторы обработанных сообщений.
Как только сервер Redis получает подтверждение от потребителя, он обновляет свои внутренние данные отслеживания, чтобы пометить сообщение как обработанное этим потребителем. Сервер также удаляет сообщение из PEL для этого потребителя.
Если потребитель не может отправить подтверждение в течение настраиваемого периода, сообщение считается неподтвержденным. Затем сервер Redis снова доставит сообщение другому потребителю в той же группе потребителей. Это гарантирует, что сообщения не будут потеряны, если потребитель выйдет из строя или выйдет из строя до того, как сможет обработать сообщение.
Реализация подтверждения
Давайте обработаем подтверждение с помощью метода xAck
:
// redis.service.ts
public async acknowledgeMessages({
streamName,
group,
messageIds,
}: AcknowledgeMessageParams) {
try {
await this.redis.xAck(streamName, group, messageIds);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xAck from Redis stream: ${error.message}`);
return null;
}
}
Для этого мы создали еще один интерфейс — AcknowledgeMessageParams
// interfaces.ts
export interface AcknowledgeMessageParams {
/** Name of stream to acknowledge message in */
streamName: string;
/** Name of consumer group */
group: string;
/** ID of messages to acknowledge */
messageIds: string[];
}
XACK
принимает один или несколько идентификаторов сообщений, поэтому мы можем пакетировать несколько подтверждений за один вызов.
Остальное просто — мы отправляем команду и обрабатываем ошибки так же, как и для других методов Redis.
(Авто)Заявка на неподтвержденные сообщения
XAUTOCLAIM
— это команда в Redis, которая автоматизирует процесс получения неподтвержденных сообщений от группы потребителей. Она похожа на команды XPENDING
и XCLAIM
, но упрощает процесс, автоматически позволяя Redis запрашивать неподтвержденные сообщения от имени потребителя.
Потребитель отправляет команду XAUTOCLAIM
с именем группы потребителей, именем потребителя и именем потока для потребления. Redis возвращает потребителю неподтвержденные сообщения, которые могут быть обработаны и подтверждены, как сообщение, полученное от XREADGROUP
. Другой потребитель может запросить неподтвержденные сообщения, если время подтверждения истекло. В последующих командах XAUTOCLAIM
Redis возвращает только неподтвержденные сообщения, которые ранее не возвращались потребителю и еще не достигли максимального количества попыток доставки.
Реализация xAutoClaim
Здесь код почти такой же, как и для xReadGroup
:
// redis.service.ts
public async autoClaimMessage({
streamName,
group,
consumer,
minIdleTimeMs,
count,
}: AutoclaimMessageParams) {
let response: RedsXAutoClaimResponse = null;
try {
response = await this.redis.xAutoClaim(
streamName,
group,
consumer,
minIdleTimeMs,
'0-0', // use 0-0 to claim all messages. In case of multiple consumers, this will be used to claim messages from other consumers
{
COUNT: count,
},
);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xAutoClaim from Redis stream: ${error.message}`);
return null;
}
return response?.messages || null;
}
Мы создали интерфейс параметров AutoclaimMessageParams
// interfaces.ts
export interface AutoclaimMessageParams {
streamName: string;
group: string;
consumer: string;
minIdleTimeMs: number;
count: number;
}
Здесь отдельные параметры: minIdleTimeMs
— это минимальное время простоя в мс, в течение которого сообщение может быть поддано автозаявке.
Использование генераторов для обработки сообщений
У нас есть функции для приема, подтверждения и даже повторного запроса сообщений. Давайте добавим все это вместе и абстрагируем всю эту логику внутри генератора, чтобы клиентам StreamHandlerService
не нужно было беспокоиться обо всех этих механизмах.
// stream-handler.service
public async *getConsumerMessageGenerator({
streamName,
group,
consumer,
count,
autoClaimMinIdleTimeMs,
autoAck = true,
}: ReadConsumerGroupParams): AsyncRedisStreamGenerator {
let fetchNewMessages = true; // Toggle for switching between fetching new messages and auto claiming messages
while (this.isAlive) {
let response: RedisStreamMessage[];
if (fetchNewMessages) {
response = await this.redisService.readConsumerGroup({
streamName,
group,
consumer,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count,
});
} else {
// Try to auto claim messages that are idle for a certain amount of time
response = await this.redisService.autoClaimMessage({
streamName,
group,
consumer,
count,
minIdleTimeMs:
autoClaimMinIdleTimeMs || StreamHandlerService.DEFAULT_IDLE_TIME_MS,
});
}
// Acknowledge messages if autoAck is enabled
if (autoAck && response?.length > 0) {
await this.redisService.acknowledgeMessages({
streamName,
group,
messageIds: response.map((m) => m.id),
});
}
// Toggle between fetching new messages and auto claiming messages
fetchNewMessages = !fetchNewMessages;
// If no messages returned, continue to next iteration without yielding
if (!response || response.length === 0) {
continue;
}
for (const message of response) {
yield message;
}
}
}
Вот новый интерфейс параметров ReadConsumerGroupParams
// interfaces.ts
export interface ReadConsumerGroupParams {
streamName: string;
group: string;
consumer: string;
count: number;
autoClaimMinIdleTimeMs?: number;
autoAck?: boolean;
}
Мы ожидаем streamName
, имя потребителя group
и уникальное имя consumer
. счетчик
того, сколько сообщений нужно попытаться получить за один вызов, а также
autoClaimMinIdleTimeMs
— минимальное время простоя в мс, по истечении которого сообщение может быть автоматически затребовано, повторно затребовано от другого потребителя, если оно не подтверждено.
И, наконец, autoAck
— должны ли сообщения автоматически подтверждаться после прочтения. Если установлено значение false, клиент должен вручную подтверждать сообщения, используя acknowledgeMessage
Мы попробуем попеременно получать новые сообщения и автоматически запрашивать забытые. Для этого вводится переменная fetchNewMessages
.
// --snip--
let fetchNewMessages = true;
while (this.isAlive) {
// --snip--
Что касается нашего генератора разветвления, мы будем использовать цикл while, который станет недействительным при уничтожении модуля. Затем мы чередуем новые сообщения и автозаявку и присваиваем результат переменной response
.
// --snip--
let response: RedisStreamMessage[];
if (fetchNewMessages) {
response = await this.redisService.readConsumerGroup({
streamName,
group,
consumer,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count,
});
} else {
// Try to auto claim messages that are idle for a certain amount of time
response = await this.redisService.autoClaimMessage({
streamName,
group,
consumer,
count,
minIdleTimeMs:
autoClaimMinIdleTimeMs || StreamHandlerService.DEFAULT_IDLE_TIME_MS,
});
}
// --snip--
Далее мы займемся функцией автоматического подтверждения. Это позволит нам не забыть подтвердить сообщение после обработки.
В производственных приложениях это может иметь или не иметь смысла. Например, если вам нужно убедиться, что сообщение было получено и успешно обработано, вам нужно будет выполнить подтверждение вручную после обработки.
// --snip--
if (autoAck && response?.length > 0) {
await this.redisService.acknowledgeMessages({
streamName,
group,
messageIds: response.map((m) => m.id),
});
// --snip--
Затем мы хотим переключить «режим выборки»
// --snip--
fetchNewMessages = !fetchNewMessages;
// --snip--
И, наконец, только если есть какие-либо сообщения, мы хотим их выдать
:
// --snip--
if (!response || response.length === 0) {
continue;
}
for (const message of response) {
yield message;
}
}
Использование сообщений
Сложная часть позади. Теперь мы можем с удовольствием читать поток Redis через группы потребителей.
Давайте сделаем это из нашего AppService
:
// app.service.ts
public async consumeMessageFromGroup(
group: string,
consumer: string,
count: number,
) {
const generator = this.streamService.getConsumerMessageGenerator({
streamName: EXAMPLE_STREAM_NAME,
group,
consumer,
count,
});
const messages: Record<string, string>[] = [];
let counter = 0;
for await (const messageObj of generator) {
messages.push(this.parseMessage(messageObj.message));
counter++;
if (counter >= count) {
break;
}
}
return {
group,
consumer,
messages,
};
}
Код почти такой же, как и для чтения нескольких новых сообщений в режиме разветвления, который мы создали в части 2. Единственное добавление — это group
и consumer
.
Давайте также добавим простую конечную точку, из которой мы можем получать сообщения, и указать группу, имя получателя и количество:
// app.controller
@Get('consume/:group/:consumer/:count')
consumeMessages(
@Param('group') group: string,
@Param('consumer') consumer: string,
@Param('count') count: number,
) {
return this.appService.consumeMessageFromGroup(group, consumer, count);
}
В этом случае мы не будем создавать несколько экземпляров нашего сервиса, но он будет работать одинаково. Вот пример:
Если мы откроем нашу службу с помощью http: // localhost: 8081/sucume/example-group/example-consumer/3
, мы получаем сообщения из потока, начинающегося для группы с именем Пример группы
и для потребителя с именем Пример-consumer
.
Если мы попробуем другого получателя, например, example-consumer-2
, мы получим новые сообщения, которые не ожидают обработки и еще не были подтверждены ни одним из получателей.
Однако, если мы попытаемся получить сообщения из другой группы, они будут получены с самого начала, независимо от того, подтвердила ли их уже другая группа или даже тот же потребитель в другой группе.
Вот пример с example-consumer
, потребляющим сообщения из того же потока, но в example-group-2
Обратите внимание, что эти сообщения совпадают с сообщениями, полученными сначала example-group
. Это потому, что они были поглощены и признаны только этой группой. Разные группы не используют PEL.
Это все, друзья!
В этой серии статей из трех частей мы успешно создали приложение NestJS, которое подключается к серверу Redis и использует структуру данных потока Redis.
В части 1 мы настроили сервер Redis, установили клиентской библиотеки Redis и установил соединение между приложением NestJS и Redis.
Во второй части мы изучили основы потоков Redis и узнали, как использовать клиент Redis для добавления сообщений в поток. Мы также реализовали режим разветвления, который позволяет нескольким клиентам получать сообщения из одного потока.
В части 3 мы подняли нашу реализацию на новый уровень, используя группы потребителей Redis. Мы узнали, как создать группу потребителей и использовать ее для более масштабируемого и надежного использования сообщений.
Мы также реализовали некоторую обработку ошибок, чтобы наше приложение могло восстановиться после ошибок и продолжить обработку сообщений без перерыва.
В целом, эта серия статей представляет собой исчерпывающее руководство по созданию приложения NestJS, использующего потоки Redis и группы потребителей. Следуя инструкциям и экспериментируя с примерами кода, вы теперь должны понимать, как реализовать системы реального времени, которые могут легко обрабатывать большие объемы данных благодаря мощности и гибкости Redis и NestJS.
Оригинал