Как использовать сообщения Kafka с помощью NestJS
29 декабря 2023 г.Многие из нас используют Kafka для публикации сообщений, но как мы их получаем? В этой статье мы напишем небольшое приложение для получения сообщений от Kafka. И, конечно же, e2e-тесты.
Давайте сначала разберемся, как работает Kafka и что это такое.
Kafka — это брокер сообщений, в котором одни службы генерируют сообщения, а другие их получают. Брокеры в основном используются в системах с микросервисной архитектурой для передачи сообщений между сервисами.
Сообщения хранятся в темах. При отправке сообщения производитель указывает название темы, а также само сообщение, состоящее из ключа и значения. Вот и все; работа продюсера закончена.
Тогда в дело вступают потребители, они подписываются на нужную тему и начинают читать сообщения. У каждого приложения есть своя очередь, читая из которой потребитель перемещает указатель смещения.
n Отличительными особенностями Kafka являются:
- Гарантия того, что все сообщения будут упорядочены именно в той последовательности, в которой они поступили в тему.
* Kafka некоторое время сохраняет прочитанные сообщения
* Высокая пропускная способность
Теперь давайте поработаем с Kafka, используя фреймворк NestJs. Во-первых, нам нужно создать контроллер, который будет обрабатывать сообщения.
@Controller()
export class AppController{
constructor(
private readonly appService: AppService,
) {
}
@EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA)
handleEvent(
@Payload() payload: ExamplePayloadDto,
): Promise<void> {
return this.appService.handleExampleEvent(payload.message);
}
}
Обратите внимание на атрибут @EventPattern
, который указывает, что наша функция handleEvent()
будет получать сообщения из темы, указанной в файле конфигурации config.get('kafka .topics.exampleTopic')
. Атрибут @Payload()
помогает получить значение из сообщения темы.
Чтобы подключить ваше приложение к брокерам Kafka, вам нужно сделать две вещи. Для начала подключите микросервис в файле запуска:
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: config.get('kafka.clientId'),
brokers: config.get('kafka.brokers'),
retry: {
retries: config.get('kafka.retryCount'),
},
},
consumer: {
groupId: config.get('kafka.consumer.groupId'),
},
},
});
А затем запустите микросервисы в main.ts:
async function bootstrap() {
const app = await NestFactory.create(AppModule, {
bufferLogs: true,
});
appStartup(app);
await app.startAllMicroservices();
await app.listen(config.get('app.port'));
};
void bootstrap();
Для тестирования приложения я использую пакет @testcontainers/kafka. С помощью этого я создал контейнер ZooKeeper, а затем контейнер Kafka:
export async function kafkaSetup(): Promise<StartedTestContainer[]> {
const network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
const kafkaPort = 9093;
const kafkaContainer = await new KafkaContainer()
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.withExposedPorts(kafkaPort)
.start();
const externalPort = kafkaContainer.getMappedPort(kafkaPort);
config.set('kafka.brokers', [`localhost:${externalPort}`]);
return [
zookeeperContainer,
kafkaContainer,
];
}
Обратите внимание, что в этом файле я переопределил адрес брокера для вновь созданного контейнера.
В самом тестовом файле в функции beforeAll
я создаю клиент Kafka. С продюсером я тоже создаю тему и запускаю наше приложение.
beforeAll(async () => {
kafkaContainers = await kafkaSetup();
kafka = new Kafka({
clientId: 'mock',
brokers: config.get('kafka.brokers'),
logLevel: logLevel.NOTHING,
});
producer = kafka.producer();
await producer.connect();
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [{ topic: config.get('kafka.topics.exampleTopic') }],
});
appService = mockDeep<AppService>();
const module: TestingModule = await Test.createTestingModule({
imports: [AppModule],
})
.overrideProvider(AppService)
.useValue(appService)
.compile();
app = module.createNestApplication();
appStartup(app);
await app.startAllMicroservices();
await app.init();
}, 30 * 1000);
Разумеется, в функции afterAll
нужно останавливать контейнеры:
afterAll(async () => {
await app.close();
await Promise.all(kafkaContainers.map(c => c.stop()));
}, 15 * 1000);
Я написал тест, который проверяет, что при поступлении сообщения в топик наша функция-обработчик из контроллера вызывает необходимую сервисную функцию. Для этого я переопределяю реализацию функции handleExampleEvent
и жду ее вызова.
describe('handleEvent', () => {
it('should call appService', async () => {
let resolve: (value: unknown) => void;
const promise = new Promise((res) => {
resolve = res;
});
appService.handleExampleEvent.mockImplementation(async () => {
resolve(0);
});
const event: ExamplePayloadDto = {
message: 'Hello World!',
};
await producer.send({
topic: config.get('kafka.topics.exampleTopic'),
messages: [{
key: 'key',
value: JSON.stringify(event),
}]
});
await promise;
await kafka.producer().disconnect();
});
});
Вот и все. Работать с Kafka невероятно легко, если вы используете фреймворк NestJs. Надеюсь, мой опыт будет вам полезен. Пример приложения можно увидеть по адресу https://github.com/waksund/kafka
Оригинал