Как использовать сообщения Kafka с помощью NestJS

Как использовать сообщения Kafka с помощью NestJS

29 декабря 2023 г.

Многие из нас используют Kafka для публикации сообщений, но как мы их получаем? В этой статье мы напишем небольшое приложение для получения сообщений от Kafka. И, конечно же, e2e-тесты.

Давайте сначала разберемся, как работает Kafka и что это такое.

Kafka — это брокер сообщений, в котором одни службы генерируют сообщения, а другие их получают. Брокеры в основном используются в системах с микросервисной архитектурой для передачи сообщений между сервисами.

Сообщения хранятся в темах. При отправке сообщения производитель указывает название темы, а также само сообщение, состоящее из ключа и значения. Вот и все; работа продюсера закончена.

Тогда в дело вступают потребители, они подписываются на нужную тему и начинают читать сообщения. У каждого приложения есть своя очередь, читая из которой потребитель перемещает указатель смещения.

n Отличительными особенностями Kafka являются:

  • Гарантия того, что все сообщения будут упорядочены именно в той последовательности, в которой они поступили в тему.

* Kafka некоторое время сохраняет прочитанные сообщения

* Высокая пропускная способность

Теперь давайте поработаем с Kafka, используя фреймворк NestJs. Во-первых, нам нужно создать контроллер, который будет обрабатывать сообщения.

@Controller()
export class AppController{

    constructor(
        private readonly appService: AppService,
    ) {
    }

    @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA)
    handleEvent(
        @Payload() payload: ExamplePayloadDto,
    ): Promise<void> {
        return this.appService.handleExampleEvent(payload.message);
    }
}

Обратите внимание на атрибут @EventPattern, который указывает, что наша функция handleEvent() будет получать сообщения из темы, указанной в файле конфигурации config.get('kafka .topics.exampleTopic'). Атрибут @Payload() помогает получить значение из сообщения темы.

Чтобы подключить ваше приложение к брокерам Kafka, вам нужно сделать две вещи. Для начала подключите микросервис в файле запуска:

app.connectMicroservice({
        transport: Transport.KAFKA,
        options: {
            client: {
                clientId: config.get('kafka.clientId'),
                brokers: config.get('kafka.brokers'),
                retry: {
                    retries: config.get('kafka.retryCount'),
                },
            },
            consumer: {
                groupId: config.get('kafka.consumer.groupId'),
            },
        },
    });

А затем запустите микросервисы в main.ts:

async function bootstrap() {
    const app = await NestFactory.create(AppModule, {
        bufferLogs: true,
    });
    appStartup(app);

    await app.startAllMicroservices();
    await app.listen(config.get('app.port'));
};

void bootstrap();

Для тестирования приложения я использую пакет @testcontainers/kafka. С помощью этого я создал контейнер ZooKeeper, а затем контейнер Kafka:

export async function kafkaSetup(): Promise<StartedTestContainer[]> {
    const network = await new Network().start();

    const zooKeeperHost = "zookeeper";
    const zooKeeperPort = 2181;
    const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2")
        .withNetwork(network)
        .withNetworkAliases(zooKeeperHost)
        .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
        .withExposedPorts(zooKeeperPort)
        .start();

    const kafkaPort = 9093;
    const kafkaContainer = await new KafkaContainer()
        .withNetwork(network)
        .withZooKeeper(zooKeeperHost, zooKeeperPort)
        .withExposedPorts(kafkaPort)
        .start();

    const externalPort = kafkaContainer.getMappedPort(kafkaPort);

    config.set('kafka.brokers', [`localhost:${externalPort}`]);

    return [
        zookeeperContainer,
        kafkaContainer,
    ];
}

Обратите внимание, что в этом файле я переопределил адрес брокера для вновь созданного контейнера.

В самом тестовом файле в функции beforeAll я создаю клиент Kafka. С продюсером я тоже создаю тему и запускаю наше приложение.

beforeAll(async () => {
        kafkaContainers = await kafkaSetup();

        kafka = new Kafka({
            clientId: 'mock',
            brokers: config.get('kafka.brokers'),
            logLevel: logLevel.NOTHING,
        });
        producer = kafka.producer();
        await producer.connect();

        const admin = kafka.admin();
        await admin.connect();
        await admin.createTopics({
            topics: [{ topic: config.get('kafka.topics.exampleTopic') }],
        });

        appService = mockDeep<AppService>();

        const module: TestingModule = await Test.createTestingModule({
            imports: [AppModule],
        })
            .overrideProvider(AppService)
            .useValue(appService)
            .compile();

        app = module.createNestApplication();
        appStartup(app);
        await app.startAllMicroservices();

        await app.init();

    }, 30 * 1000);

Разумеется, в функции afterAll нужно останавливать контейнеры:

afterAll(async () => {
        await app.close();
        await Promise.all(kafkaContainers.map(c => c.stop()));
    }, 15 * 1000);

Я написал тест, который проверяет, что при поступлении сообщения в топик наша функция-обработчик из контроллера вызывает необходимую сервисную функцию. Для этого я переопределяю реализацию функции handleExampleEvent и жду ее вызова.

describe('handleEvent', () => {
        it('should call appService', async () => {
            let resolve: (value: unknown) => void;
            const promise = new Promise((res) => {
                resolve = res;
            });
            appService.handleExampleEvent.mockImplementation(async () => {
                resolve(0);
            });

            const event: ExamplePayloadDto = {
                message: 'Hello World!',
            };

            await producer.send({
                topic: config.get('kafka.topics.exampleTopic'),
                messages: [{
                    key: 'key',
                    value: JSON.stringify(event),
                }]
            });

            await promise;

            await kafka.producer().disconnect();
        });
    });

Вот и все. Работать с Kafka невероятно легко, если вы используете фреймворк NestJs. Надеюсь, мой опыт будет вам полезен. Пример приложения можно увидеть по адресу https://github.com/waksund/kafka


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE