Взгляд на шаблоны связи для реализации в распределенных приложениях через NATS

Взгляд на шаблоны связи для реализации в распределенных приложениях через NATS

7 марта 2022 г.

В этой статье мы рассмотрим различные шаблоны связи, которые мы можем реализовать в наших распределенных приложениях с помощью NATS.


  • В этой статье мы будем использовать Go, но NATS имеет более 40 реализаций клиентских языков, таких как JAVA, C#, Python, Rust, Elixir и многие другие. Как всегда, весь код будет доступен в этом репозитории.*

Настраивать


Давайте инициализируем простой проект go.


Примечание. Этот пример основан на Linux/MacOS, но NATS также поддерживается в Windows.


``` ударить


Пример инициализации $go mod


Установите пакет nats.go


$ установить github.com/nats-io/nats.go/@latest


Мы будем использовать такую ​​структуру папок


├── команда


│ ├── публиковать-подписываться


│ │ └── main.go


│ ├── запрос-ответ


│ │ └── main.go


│ └── очереди-группы


│ └── main.go


├── go.mod


└── go.sum


Последний шаг — запустить наш локальный [nats-сервер] (https://github.com/nats-io/nats-server)


$ nats-сервер


[18661] 2022/03/02 00:41:46.170012 [INF] Запуск nats-сервера


[18661] 2022/03/02 00:41:46.170585 [INF] Версия: 2.7.3


[18661] 2022/03/02 00:41:46.170589 [INF] Git: [не установлено]


[18661] 2022/03/02 00:41:46.170592 [INF] Имя: NAIAIQUT4426EPG4BEPKOUGMAVWS52GIFPWFZEO7ELXF3FVI5EVSLMTY


[18661] 2022/03/02 00:41:46.170595 [INF] ID: NAIAIQUT4426EPG4BEPKOUGMAVWS52GIFPWFZEO7ELXF3FVI5EVSLMTY


[18661] 2022/03/02 00:41:46.171426 [INF] Прослушивание клиентских подключений на 0.0.0.0:4222


[18661] 2022/03/02 00:41:46.171920 [INF] Сервер готов


Опубликовать-подписаться


опубликовать-подписаться


NATS реализует модель распространения сообщений «публикация-подписка» для связи «один ко многим». Издатель отправляет сообщение по теме, и любой активный подписчик, слушающий эту тему, получает сообщение. Этот шаблон 1:N (один ко многим) иногда называют разветвлением.


Подписчики также могут проявлять интерес к темам с подстановочными знаками, которые немного похожи на регулярное выражение (но только немного). Например,


  • foo.* соответствует foo.bar и foo.baz.

  • foo.*.bar соответствует foo.a.bar и foo.b.bar.

  • foo.> соответствует любому из приведенных выше.

*Сообщения имеют максимальный размер (устанавливается в конфигурации сервера с помощью * max_payload). По умолчанию установлен размер 1 МБ, но при необходимости его можно увеличить до 64 МБ (хотя команда NATS рекомендует сохранять максимальный размер сообщения на более разумном уровне, например 8 МБ).


Зачем нам это надо?


Publish-Subscribe — довольно распространенный вариант использования, как следует из названия, мы можем использовать его для разветвления сообщений на разные сервисы.


Код


Давайте напишем код в cmd/publish-subscribe/main.go, чтобы лучше понять это, начнем с инициализации нашего клиента NATS.


```иди


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


Подпишитесь на нашу тему foo, используя 3 подписчиков, чтобы мы могли увидеть разветвление сообщения в действии.


```иди


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 1:", строка(msg.Data))


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 2:", строка(msg.Data))


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 3:", строка(msg.Data))


Опубликуйте наше сообщение в теме foo и подождите.


```иди


if err := nc.Publish("foo", []byte("Вот кое-что")); ошибка != ноль {


log.Fatalln(ошибка)


время.Сон(2 * время.Секунда)


Полный пример должен выглядеть так. Да, распределенный обмен сообщениями с NATS — это так просто!


```иди


основной пакет


импорт (


"журнал"


"время"


"github.com/nats-io/nats.go"


основная функция () {


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 1:", строка(msg.Data))


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 2:", строка(msg.Data))


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Абонент 3:", строка(msg.Data))


если ошибка := nc.Publish("foo", []byte("Message")); ошибка != ноль {


log.Fatalln(ошибка)


время.Сон(2 * время.Секунда)


Выход


Как видим, наше сообщение было разослано всем подписчикам.


$ go run cmd/publish-subscribe/main.go


01.03.2022 22:42:56 Подписчик 1: Сообщение


01.03.2022 22:42:56 Подписчик 3: Сообщение


01.03.2022 22:42:56 Подписчик 2: Сообщение


Запрос-Ответ


запрос-ответ


Запрос-ответ — распространенный шаблон в современных распределенных системах. Отправляется запрос, и приложение либо ждет ответа с определенным таймаутом, либо получает ответ


асинхронно.


NATS делает запрос-ответ простым и эффективным, а также обеспечивает такие мощные функции, как прозрачность местоположения, увеличение и уменьшение масштаба, возможность наблюдения и многое другое.


Зачем нам это надо?


Иногда между сервисами требуется прямая связь, для этого отлично подходит шаблон Request-Reply.


Код


Давайте начнем с написания кода в cmd/request-reply/main.go. Как и раньше, давайте инициализируем наш клиент NATS.


```иди


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


Подпишитесь на нашу тему foo и добавьте логирование.


```иди


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Запрос получен:", строка(msg.Data))


msg.Respond([]byte("Вот оно!"))


Мы также можем использовать уникальные темы ответов, клиенты могут делать запросы к службам, которые отвечают только запрашивающей стороне, создавая отношения 1 к 1.


```иди


nc.Publish(msg.Reply, []byte("Вот, пожалуйста!"))


Теперь мы будем использовать метод Request на клиенте NATS. У него три аргумента: первый — тема, второй — данные в массиве байтов, а последний — тайм-аут запроса.


```иди


ответ, ошибка := nc.Request("foo", []byte("Дайте мне данные"), 4*time.Second)


если ошибка != ноль {


log.Fatalln(ошибка)


log.Println("Получил ответ:", строка(ответ.Данные))


Итак, наш полный пример должен выглядеть так.


```иди


основной пакет


импорт (


"журнал"


"время"


"github.com/nats-io/nats.go"


основная функция () {


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


nc.Subscribe ("foo", func (msg *nats.Msg) {


log.Println("Запрос получен:", строка(msg.Data))


msg.Respond([]byte("Вот оно!"))


ответ, ошибка := nc.Request("foo", []byte("Дайте мне данные"), 4*time.Second)


если ошибка != ноль {


log.Fatalln(ошибка)


log.Println("Получил ответ:", строка(ответ.Данные))


Выход


Как и ожидалось, наш запрос был получен, и наш подписчик ответил некоторыми данными.


$ запустить cmd/запрос-ответ/main.go


01.03.2022, 20:14:53 Получен запрос: Дайте мне данные


01.03.2022 20:14:53 Получил ответ: Вот!


Группы очередей


группы очередей


NATS предоставляет встроенную функцию балансировки нагрузки, называемую распределенными очередями. Использование подписчиков очереди позволит сбалансировать доставку сообщений по группе подписчиков, что можно использовать для обеспечения отказоустойчивости приложения и масштабирования обработки рабочей нагрузки.


Зачем нам это надо?


Подписчики очереди идеально подходят для масштабирования сервисов. Масштабирование так же просто, как запуск другого приложения, масштабирование завершает приложение с сигналом, который истощает текущие запросы. Эта гибкость и отсутствие каких-либо изменений конфигурации делают NATS превосходной технологией передачи услуг, которая может работать со всеми технологиями платформы. Одной из замечательных особенностей NATS является то, что группы очередей определяются приложением и их подписчиками очередей, а не конфигурацией сервера.


Код


Чтобы создать подписку на очередь, подписчики регистрируют имя очереди. Все подписчики с одинаковым именем очереди образуют группу очереди. Это не требует настройки. По мере публикации сообщений на зарегистрированную тему случайным образом выбирается один член группы для получения сообщения. Хотя группы очередей имеют несколько подписчиков, каждое сообщение потребляется только одним.


Мы начнем с нашего кода в cmd/queue-groups/main.go, и, как и раньше, наш код подключения к клиенту такой же:


```иди


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


Далее мы создадим 3 подписчика очереди с темой foo и именем очереди queue.foo


```иди


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 1:", строка(msg.Data))


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 2:", строка(msg.Data))


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 3:", строка(msg.Data))


Наконец, мы создадим цикл и будем публиковать отдельные сообщения в теме foo, чтобы мы могли видеть, как они будут получены.


```иди


для я := 1; я <= 3; я++ {


сообщение := fmt.Sprintf("Сообщение %d", я)


если ошибка := nc.Publish("foo", []byte(message)); ошибка != ноль {


log.Fatalln(ошибка)


время.Сон(2 * время.Секунда)


Вот наш полный пример


```иди


основной пакет


импорт (


"ФМТ"


"журнал"


"время"


"github.com/nats-io/nats.go"


основная функция () {


nc, ошибка := nats.Connect(nats.DefaultURL)


если ошибка != ноль {


log.Fatalln(ошибка)


отложить nc.Close()


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 1:", строка(msg.Data))


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 2:", строка(msg.Data))


nc.QueueSubscribe ("foo", "queue.foo", func (msg *nats.Msg) {


log.Println("Абонент 3:", строка(msg.Data))


для я := 1; я <= 3; я++ {


сообщение := fmt.Sprintf("Сообщение %d", я)


если ошибка := nc.Publish("foo", []byte(message)); ошибка != ноль {


log.Fatalln(ошибка)


время.Сон(2 * время.Секунда)


Выход


Как мы видим, наши сообщения случайным образом распределялись между подписчиками. Таким образом, NATS может действовать как балансировщик нагрузки уровня 7 для служб.


$ запустить cmd/queue-groups/main.go


01.03.2022 22:53:59 Подписчик 3: Сообщение 2


01.03.2022 22:53:59 Подписчик 1: Сообщение 3


01.03.2022 22:53:59 Подписчик 2: Сообщение 1


Заключение


В этой статье мы рассмотрели различные шаблоны связи, которые демонстрируют возможности распределенного обмена сообщениями в реальном времени с помощью NATS. Кроме того, JetStream можно использовать в сочетании с этими шаблонами, когда требуется надежный обмен сообщениями и политика хотя бы однократной доставки.


Я надеюсь, что эта статья была полезной. Если у вас есть какие-либо вопросы, не стесняйтесь обращаться к ним или публиковать их в NATS slack community.


Также опубликовано здесь



Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE