Потоковая передача данных из REST API с использованием Spring Webflux

Потоковая передача данных из REST API с использованием Spring Webflux

26 апреля 2023 г.

В современном быстро меняющемся и управляемом данными мире эффективная обработка больших объемов данных может иметь решающее значение. Spring Webflux, часть среды Spring, обеспечивает неблокирующий реактивный подход к обработке потоков данных, обеспечивая низкое потребление ресурсов и высокую пропускную способность. В этой статье рассматриваются возможности Spring Webflux и его практическое применение.

В этом руководстве мы будем использовать общедоступный API 4chan в качестве источника данных. Выбор 4chan API — это не одобрение платформы, а практическое решение, основанное на ее общедоступности, отсутствии требований аутентификации и характере ее данных, который подходит для демонстрации возможностей Spring Webflux.

В этой статье вы узнаете, как получать данные из API 4chan, непрерывно транслировать их с помощью Spring Webflux и обрабатывать данные, чтобы получить представление о показателях платформы.

:::подсказка Репозиторий git для этого руководства можно найти здесь.

:::

Зависимости и структура проекта

Проект 4chan Metrics Collector состоит из двух подмодулей Gradle: app и api-client. Файл settings.gradle.kts отвечает за включение этих подмодулей в проект.

Файл build.gradle.kts служит основной конфигурацией сборки проекта. Он определяет плагины, совместимость с Java, конфигурации, репозитории и общие зависимости для всех подмодулей.

Примечательные зависимости для этого проекта включают:

  • Активатор Spring Boot
  • Весенняя загрузка WebFlux
  • АОП Spring Boot
  • Ломбок
  • Реестр Micrometer Prometheus

Подмодуль app, настроенный в app/build.gradle.kts, зависит от подмодуля api-client. Подмодуль app — это приложение Spring Boot, отвечающее за запуск сборщика метрик. Подмодуль api-client, настроенный в api-client/build.gradle.kts, представляет собой библиотеку, содержащую реализацию клиента API, используемого приложением<. /code> для доступа к общедоступному API 4chan.

Хотя основное внимание в этой статье уделяется использованию Spring Webflux для потоковой передачи данных из API 4chan, проект также демонстрирует несколько других ценных функций, таких как:

* Проект Gradle с конфигурацией на основе Kotlin * Запуск и остановка потоков при запуске и завершении работы приложения * Пользовательская конфигурация Spring * Использование Docker Compose для одновременного запуска приложения, Prometheus и Grafana. * Публикация метрик в Prometheus * Использование Grafana для построения графиков опубликованных показателей

Извлечение, потоковая передача и обработка данных с помощью Spring Webflux

В этом разделе мы рассмотрим, как приложение извлекает данные из API 4chan, выполняет потоковую передачу данных с помощью Spring Webflux и обрабатывает их для сбора и публикации показателей.

Обзор

Процесс начинается с класса MetricsCollector, который планирует периодические запросы к API 4chan на основе настроенных досок в файле application.yml. Для каждой платы создается Flux для передачи данных, и эти экземпляры Flux объединяются в один Flux, на который подписывается приложение. По мере прохождения данных по подписке приложение публикует метрики в Prometheus.

На приведенной ниже диаграмме показан поток приложения. Каждая настроенная доска постоянно отправляет сообщения, содержащие метаданные об их активных потоках, полученные из общедоступного API 4chan. Эти потоки объединяются в один непрерывный поток для всех активных потоков на настроенных досках, и каждый сообщение обрабатывается в подписке, метрики публикуются в Prometheus.

Создать Flux и подписаться на поток данных

@PostConstruct
public void produce() {
  boards.stream()
      .map(board -> Flux.create(emit(board)))
      .reduce(Flux::merge)
      .orElse(Flux.empty())
      .flatMap(Function.identity())
      .doOnNext(this::publishThreadMetrics)
      .subscribe();
}

Точкой входа приложения является метод product в классе MetricsCollector. Благодаря аннотации @PostConstruct этот метод выполняется при запуске приложения. В этом методе все настроенные платы передаются в потоковом режиме, и каждая из них сопоставляется с Flux, отвечающим за отправку данных активных потоков для платы.

На данный момент у нас есть поток отдельных объектов Flux, который затем сводится к одному Flux, содержащему все активные потоки для всех настроенных плат. Наконец, он подписывается, чтобы начать процесс потребления данных, и по мере того, как каждый следующий элемент проходит через поток, соответственно публикуются метрики потока.

Излучающие элементы в поток

Метод emit в классе MetricsCollector настраивает планировщик для периодического выполнения запросов:

private Consumer<FluxSink<Flux<FourChanThread>>> emit(final String board) {
  return emitter -> scheduler.scheduleAtFixedRate(
      execute(emitter, board),
      random.nextInt(REQUEST_DELAY_SECONDS),
      REQUEST_INTERVAL_SECONDS,
      TimeUnit.SECONDS);
}

Случайная задержка гарантирует, что не все потоки одновременно отправят запросы к API 4chan.

Метод execute вызывает метод getThreads класса ApiClient для получения данных из API 4chan:

private Runnable execute(final FluxSink<Flux<FourChanThread>> emitter, final String board) {
  return () -> {
    emitter.next(apiClient.getThreads(board));
  };
}

Получение данных из общедоступного API

Класс ApiClient использует экземпляр WebClient для выполнения реактивных HTTP-запросов к общедоступному API 4chan. Метод getThreads возвращает Flux<FourChanThread>, представляющий поток данных активных потоков для конкретной платы.

Каждый вызов конечной точки потоков возвращает список страниц, и каждая страница содержит список потоков. Вызов bodyToMono анализирует ответ JSON< /a> в Publisher одного списка страниц. Последующие вызовы сопоставления используются для распаковки потоков из содержащих их списков, создавая Flux потоков. Последнее сопоставление используется для обогащения метаданных треда конкретной доской, к которой принадлежит тред.

Тестирование приложений Webflux

Тестирование приложений Webflux может быть довольно сложным из-за асинхронного и неблокирующего характера реактивного программирования. Однако при правильном подходе и инструментах вы можете написать эффективные тесты, обеспечивающие надлежащую функциональность и производительность вашего приложения. В этом разделе мы обсудим проблемы тестирования приложений Webflux, используем модульные тесты нашего проекта в качестве примеров и дадим советы по написанию эффективных тестов.

Проблемы тестирования приложений Webflux

Некоторые проблемы при тестировании приложений Webflux включают:

  1. Асинхронное поведение. Поскольку приложения Webflux создаются с использованием реактивного программирования, обработка асинхронного поведения в тестах может быть сложной. Вам необходимо убедиться, что ваши тестовые примеры правильно учитывают асинхронный характер тестируемого кода.
  2. Обработка ошибок. Реактивные приложения могут иметь сложные сценарии обработки ошибок. Очень важно протестировать различные состояния ошибок, чтобы убедиться, что ваше приложение корректно обрабатывает сбои.
  3. Тайм-ауты. Тайм-ауты — это распространенная проблема в приложениях Webflux, и проверка того, как ваше приложение их обрабатывает, имеет решающее значение для обеспечения надежной работы.
  4. Веб-клиент: сложно имитировать реактивный WebClient, используемый для создания HTTP вызовы. Без надлежащей осторожности легко запутаться в сложной структуре методов, которые нужно заглушить, и последующих объектов, которые нужно имитировать.

Модульное тестирование

В нашем проекте у нас есть два тестовых класса, MetricsCollectorTest и ApiClientTest, которые охватывают различные аспекты функциональности приложения. Эти тесты служат примерами того, как справляться с проблемами, упомянутыми выше:

Асинхронное поведение

Класс MetricsCollectorTest проверяет асинхронное поведение метода produce(), используя Thread.sleep() для ожидания ожидаемого времени выполнения. прежде чем делать утверждения.

metricsCollector.produce();
Thread.sleep(TimeUnit.SECONDS.toMillis(REQUEST_DELAY_SECONDS) + 1L);

Класс ApiClientTest использует StepVerifier из библиотеки reactor-test, чтобы гарантировать ожидаемое поведение getThreads(). метод.

Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectNext(...)
    .verifyComplete();

Обработка ошибок

Класс ApiClientTest проверяет обработку ошибок методом getThreads(), имитируя отсутствующую доску и гарантируя, что выдается исключение WebClientResponseException.

Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectErrorMatches(throwable -> throwable instanceof WebClientResponseException)
    .verify();

Время ожидания

Класс ApiClientTest проверяет обработку тайм-аута метода getThreads(), имитируя отложенный ответ, гарантируя создание TimeoutException.

Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectErrorMatches(throwable -> throwable instanceof TimeoutException)
    .verify();

Веб-клиент

В классе ApiClientTest мы внедряем адаптированный WebClient в экземпляр ApiClient, который тестируется с помощью указанной функции обмена. Это позволяет нам контролировать поведение WebClient и создавать тестовые примеры, подходящие для различных сценариев.

private WebClient createWebClient(HttpStatus httpStatus, Duration delay, String body, String contentType) {
  ClientResponse clientResponse = ClientResponse.create(httpStatus, ExchangeStrategies.withDefaults())
      .header(HttpHeaders.CONTENT_TYPE, contentType)
      .body(body)
      .build();

  ExchangeFunction exchangeFunction = request -> Mono
      .just(clientResponse)
      .delayElement(delay);

  return WebClient.builder()
      .exchangeFunction(exchangeFunction)
      .build();
}

Заключение

В этой статье мы рассмотрели возможности Spring Webflux для обработки больших объемов данных из REST API, используя общедоступный API 4chan в качестве источника данных. Мы узнали, как получать, передавать и обрабатывать данные с помощью Spring Webflux, а также изучили структуру проекта, зависимости и различные функции, такие как конфигурация проекта Gradle, запуск и остановка потоков и пользовательская конфигурация Spring.

Мы также обсудили проблемы тестирования приложений Webflux и предоставили примеры из модульных тестов нашего проекта для преодоления этих проблем. Мы подчеркнули важность обработки асинхронного поведения, обработки ошибок и тайм-аутов при тестировании и дали советы по написанию эффективных тестов для приложений Webflux.

Поняв и применив концепции, представленные в этой статье, вы будете лучше подготовлены к использованию возможностей Spring Webflux в своих собственных приложениях. В результате вы можете создавать эффективные приложения, управляемые данными, которые эффективно обрабатывают и анализируют большие объемы данных из REST API.


:::информация Избранное изображение Белоснежка

:::


Оригинал