Потоковая передача данных из REST API с использованием Spring Webflux
26 апреля 2023 г.В современном быстро меняющемся и управляемом данными мире эффективная обработка больших объемов данных может иметь решающее значение. Spring Webflux, часть среды Spring, обеспечивает неблокирующий реактивный подход к обработке потоков данных, обеспечивая низкое потребление ресурсов и высокую пропускную способность. В этой статье рассматриваются возможности Spring Webflux и его практическое применение.
В этом руководстве мы будем использовать общедоступный API 4chan в качестве источника данных. Выбор 4chan API — это не одобрение платформы, а практическое решение, основанное на ее общедоступности, отсутствии требований аутентификации и характере ее данных, который подходит для демонстрации возможностей Spring Webflux.
В этой статье вы узнаете, как получать данные из API 4chan, непрерывно транслировать их с помощью Spring Webflux и обрабатывать данные, чтобы получить представление о показателях платформы.
:::подсказка Репозиторий git для этого руководства можно найти здесь.
:::
Зависимости и структура проекта
Проект 4chan Metrics Collector состоит из двух подмодулей Gradle: app и api-client. Файл settings.gradle.kts отвечает за включение этих подмодулей в проект.
Файл build.gradle.kts служит основной конфигурацией сборки проекта. Он определяет плагины, совместимость с Java, конфигурации, репозитории и общие зависимости для всех подмодулей.
Примечательные зависимости для этого проекта включают:
- Активатор Spring Boot
- Весенняя загрузка WebFlux
- АОП Spring Boot
- Ломбок
- Реестр Micrometer Prometheus
Подмодуль app, настроенный в app/build.gradle.kts, зависит от подмодуля api-client. Подмодуль app — это приложение Spring Boot, отвечающее за запуск сборщика метрик. Подмодуль api-client, настроенный в api-client/build.gradle.kts, представляет собой библиотеку, содержащую реализацию клиента API, используемого приложением<. /code> для доступа к общедоступному API 4chan.
Хотя основное внимание в этой статье уделяется использованию Spring Webflux для потоковой передачи данных из API 4chan, проект также демонстрирует несколько других ценных функций, таких как:
* Проект Gradle с конфигурацией на основе Kotlin * Запуск и остановка потоков при запуске и завершении работы приложения * Пользовательская конфигурация Spring * Использование Docker Compose для одновременного запуска приложения, Prometheus и Grafana. * Публикация метрик в Prometheus * Использование Grafana для построения графиков опубликованных показателей
Извлечение, потоковая передача и обработка данных с помощью Spring Webflux
В этом разделе мы рассмотрим, как приложение извлекает данные из API 4chan, выполняет потоковую передачу данных с помощью Spring Webflux и обрабатывает их для сбора и публикации показателей.
Обзор
Процесс начинается с класса MetricsCollector, который планирует периодические запросы к API 4chan на основе настроенных досок в файле application.yml. Для каждой платы создается Flux для передачи данных, и эти экземпляры Flux объединяются в один Flux, на который подписывается приложение. По мере прохождения данных по подписке приложение публикует метрики в Prometheus.
На приведенной ниже диаграмме показан поток приложения. Каждая настроенная доска постоянно отправляет сообщения, содержащие метаданные об их активных потоках, полученные из общедоступного API 4chan. Эти потоки объединяются в один непрерывный поток для всех активных потоков на настроенных досках, и каждый сообщение обрабатывается в подписке, метрики публикуются в Prometheus.

Создать Flux и подписаться на поток данных
@PostConstruct
public void produce() {
boards.stream()
.map(board -> Flux.create(emit(board)))
.reduce(Flux::merge)
.orElse(Flux.empty())
.flatMap(Function.identity())
.doOnNext(this::publishThreadMetrics)
.subscribe();
}
Точкой входа приложения является метод product в классе MetricsCollector. Благодаря аннотации @PostConstruct этот метод выполняется при запуске приложения. В этом методе все настроенные платы передаются в потоковом режиме, и каждая из них сопоставляется с Flux, отвечающим за отправку данных активных потоков для платы.
На данный момент у нас есть поток отдельных объектов Flux, который затем сводится к одному Flux, содержащему все активные потоки для всех настроенных плат. Наконец, он подписывается, чтобы начать процесс потребления данных, и по мере того, как каждый следующий элемент проходит через поток, соответственно публикуются метрики потока.
Излучающие элементы в поток
Метод emit в классе MetricsCollector настраивает планировщик для периодического выполнения запросов:
private Consumer<FluxSink<Flux<FourChanThread>>> emit(final String board) {
return emitter -> scheduler.scheduleAtFixedRate(
execute(emitter, board),
random.nextInt(REQUEST_DELAY_SECONDS),
REQUEST_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}
Случайная задержка гарантирует, что не все потоки одновременно отправят запросы к API 4chan.
Метод execute вызывает метод getThreads класса ApiClient для получения данных из API 4chan:
private Runnable execute(final FluxSink<Flux<FourChanThread>> emitter, final String board) {
return () -> {
emitter.next(apiClient.getThreads(board));
};
}
Получение данных из общедоступного API
Класс ApiClient использует экземпляр WebClient для выполнения реактивных HTTP-запросов к общедоступному API 4chan. Метод getThreads возвращает Flux<FourChanThread>, представляющий поток данных активных потоков для конкретной платы.
Каждый вызов конечной точки потоков возвращает список страниц, и каждая страница содержит список потоков. Вызов bodyToMono анализирует ответ JSON< /a> в Publisher одного списка страниц. Последующие вызовы сопоставления используются для распаковки потоков из содержащих их списков, создавая Flux потоков. Последнее сопоставление используется для обогащения метаданных треда конкретной доской, к которой принадлежит тред.
public Flux<FourChanThread> getThreads(String board) {
log.debug("Issuing request to get threads for /{} board", board);
Endpoint endpoint = Endpoint.THREADS; // Endpoint is an enum that defines the API endpoints and is used to build the endpoint path
ParameterizedTypeReference<List<FourChanThreadList>> typeReference =
new ParameterizedTypeReference<>() {
};
return webClient.get()
.uri(endpoint.getEndpoint(board))
.retrieve()
.bodyToMono(typeReference)
.timeout(TIMEOUT)
.doOnError(throwable -> log.warn("Error when issuing request to get threads for /{} board", board, throwable))
.flatMapMany(Flux::fromIterable)
.flatMap(threadList -> Flux.fromIterable(threadList.getThreads()))
.flatMap(thread -> Mono.just(thread.withBoard(board)));
}
Тестирование приложений Webflux
Тестирование приложений Webflux может быть довольно сложным из-за асинхронного и неблокирующего характера реактивного программирования. Однако при правильном подходе и инструментах вы можете написать эффективные тесты, обеспечивающие надлежащую функциональность и производительность вашего приложения. В этом разделе мы обсудим проблемы тестирования приложений Webflux, используем модульные тесты нашего проекта в качестве примеров и дадим советы по написанию эффективных тестов.
Проблемы тестирования приложений Webflux
Некоторые проблемы при тестировании приложений Webflux включают:
- Асинхронное поведение. Поскольку приложения Webflux создаются с использованием реактивного программирования, обработка асинхронного поведения в тестах может быть сложной. Вам необходимо убедиться, что ваши тестовые примеры правильно учитывают асинхронный характер тестируемого кода.
- Обработка ошибок. Реактивные приложения могут иметь сложные сценарии обработки ошибок. Очень важно протестировать различные состояния ошибок, чтобы убедиться, что ваше приложение корректно обрабатывает сбои.
- Тайм-ауты. Тайм-ауты — это распространенная проблема в приложениях Webflux, и проверка того, как ваше приложение их обрабатывает, имеет решающее значение для обеспечения надежной работы.
- Веб-клиент: сложно имитировать реактивный
WebClient, используемый для создания HTTP вызовы. Без надлежащей осторожности легко запутаться в сложной структуре методов, которые нужно заглушить, и последующих объектов, которые нужно имитировать.
Модульное тестирование
В нашем проекте у нас есть два тестовых класса, MetricsCollectorTest и ApiClientTest, которые охватывают различные аспекты функциональности приложения. Эти тесты служат примерами того, как справляться с проблемами, упомянутыми выше:
Асинхронное поведение
Класс MetricsCollectorTest проверяет асинхронное поведение метода produce(), используя Thread.sleep() для ожидания ожидаемого времени выполнения. прежде чем делать утверждения.
metricsCollector.produce();
Thread.sleep(TimeUnit.SECONDS.toMillis(REQUEST_DELAY_SECONDS) + 1L);
Класс ApiClientTest использует StepVerifier из библиотеки reactor-test, чтобы гарантировать ожидаемое поведение getThreads(). метод.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectNext(...)
.verifyComplete();
Обработка ошибок
Класс ApiClientTest проверяет обработку ошибок методом getThreads(), имитируя отсутствующую доску и гарантируя, что выдается исключение WebClientResponseException.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectErrorMatches(throwable -> throwable instanceof WebClientResponseException)
.verify();
Время ожидания
Класс ApiClientTest проверяет обработку тайм-аута метода getThreads(), имитируя отложенный ответ, гарантируя создание TimeoutException.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectErrorMatches(throwable -> throwable instanceof TimeoutException)
.verify();
Веб-клиент
В классе ApiClientTest мы внедряем адаптированный WebClient в экземпляр ApiClient, который тестируется с помощью указанной функции обмена. Это позволяет нам контролировать поведение WebClient и создавать тестовые примеры, подходящие для различных сценариев.
private WebClient createWebClient(HttpStatus httpStatus, Duration delay, String body, String contentType) {
ClientResponse clientResponse = ClientResponse.create(httpStatus, ExchangeStrategies.withDefaults())
.header(HttpHeaders.CONTENT_TYPE, contentType)
.body(body)
.build();
ExchangeFunction exchangeFunction = request -> Mono
.just(clientResponse)
.delayElement(delay);
return WebClient.builder()
.exchangeFunction(exchangeFunction)
.build();
}
Заключение
В этой статье мы рассмотрели возможности Spring Webflux для обработки больших объемов данных из REST API, используя общедоступный API 4chan в качестве источника данных. Мы узнали, как получать, передавать и обрабатывать данные с помощью Spring Webflux, а также изучили структуру проекта, зависимости и различные функции, такие как конфигурация проекта Gradle, запуск и остановка потоков и пользовательская конфигурация Spring.
Мы также обсудили проблемы тестирования приложений Webflux и предоставили примеры из модульных тестов нашего проекта для преодоления этих проблем. Мы подчеркнули важность обработки асинхронного поведения, обработки ошибок и тайм-аутов при тестировании и дали советы по написанию эффективных тестов для приложений Webflux.
Поняв и применив концепции, представленные в этой статье, вы будете лучше подготовлены к использованию возможностей Spring Webflux в своих собственных приложениях. В результате вы можете создавать эффективные приложения, управляемые данными, которые эффективно обрабатывают и анализируют большие объемы данных из REST API.
:::информация Избранное изображение Белоснежка
:::
Оригинал