Взгляд на использование Elixir Streams, Elasticsearch и AWS S3

Взгляд на использование Elixir Streams, Elasticsearch и AWS S3

30 ноября 2022 г.

В качестве штатного инженера в группе Lob подтверждения адресов мне недавно было поручено настроить конечную точку для клиентов, которая предоставила им все данные, которые мы сохранили для определенного почтового индекса. Мы довольно активно используем Elasticsearch для хранения наших данных, поэтому все сводилось к запросу ES по почтовому индексу и записи данных в Amazon S3. Но у обоих сервисов есть тонкие ограничения на то, как вы их читаете/записываете: ES ограничивает ваш результирующий набор до 10 000 записей, а Amazon S3 настаивает на том, чтобы вы записывали в свои корзины кусками не менее 4 мегабайт. Это оказалось идеальным вариантом использования для потоков Elixir!

Эликсирные потоки

Прежде чем мы углубимся в мельчайшие детали Elasticsearch и AWS S3, давайте сначала кратко вспомним Elixir Streams.

Что такое поток? Поток — это компонуемое перечисляемое, которое вычисляется лениво. Обычно это достигается путем сохранения небольшого количества состояния, которое описывает, где вы в данный момент находитесь в своем перечислимом, и метод вычисления следующего элемента. Тривиальным примером может служить поток натуральных чисел:

Stream.iterate(0, fn x -> x + 1 end)

Кровавые подробности того, как это реализовано под капотом, — тема для другого дня. Вам редко нужно получать этот низкий уровень — в библиотеке Stream есть множество функций, похожих на Enum, которые вы можете использовать. в качестве строительных блоков для создания собственных потоков. А некоторые функции библиотеки Elixir изначально возвращают потоки: IO.stream превратит файл (или другой источник) в поток, что позволит вам обрабатывать очень большие файлы без необходимости сначала считывать их в память целиком. Ленивый аспект Streams означает, что вычисляется минимальное количество Stream, необходимое для генерации вашего ответа.

Представьте, что мы хотим найти 100-е натуральное число, которое является палиндромным и также делится на 3:

1 
|> Stream.iterate(fn n -> n + 1 end) 
|> Stream.filter(fn n -> s = Integer.to_string(n); s == String.reverse(s) end)
|> Stream.filter(fn n -> rem(n, 3) == 0 end)
|> Enum.at(99)

Что я нахожу элегантным (и эффективным) в этом решении, так это то, что никакое число, кроме ответа 20202, не будет вычислено. Аккуратно!

Если вы хотите погрузиться глубже, есть отличный раздел «Начало работы» на Enumerables и Streams на веб-сайте языка Elixir. документация Elixir API также полна поучительных примеров.

Эластичный поиск

Теперь перейдите на Elasticsearch. Первое, что нужно знать, это то, что ES ограничивает размер результирующего набора 10 тысячами элементов. После тщательного изучения документации стало ясно, что исходный метод, появившийся в моем переполнении стека (Scroll API) устарел, и новый одобренный метод заключался в использовании идентификаторов PointInTime. Идентификаторы PIT, по сути, дают вам моментальный снимок вашего индекса в определенный момент времени, чтобы вы могли получить согласованное представление результатов поиска по нескольким запросам. Процесс следующий:

* получить PIT для вашего индекса * выполнять запросы с использованием PIT (без индекса — теперь он неявно указан в PID) * удалить PIT

Этот последний шаг имеет решающее значение — PIT чрезвычайно требовательны к ресурсам, поэтому очень важно удалить их, как только вы закончите. Я прервал нашу промежуточную ES в своих ранних экспериментах с PIT, потому что я не удалял их, а параметр истечения срока действия, казалось, не имел никакого эффекта.

Другая вещь, которую мы хотели бы сделать, — это абстрагироваться от лежащего в основе PIT и механизма подкачки для конечного пользователя, поэтому мы предоставляем интерфейс, который просто принимает запрос ES и генерирует поток обращений. Начнем с создания и удаления PID.

Создание PID — это просто запрос PID, связанного с индексом, к которому мы хотим выполнять запросы.

@spec create(ES.index_t()) :: {:ok, pit_t()} | {:error, ES.response_t()}
def create(index) do
  :post
  |> HTTP.request(url(index), HTTP.no_payload(), HTTP.json_encoding())
  |> HTTP.on_200(& &1["id"])
end

Удаление PID — это простое удаление HTTP, за исключением того, что PID огромны, поэтому вам нужно указать их в теле, а не в качестве параметров URL. Некоторые HTTP-библиотеки не позволяют вам это сделать. Спецификация HTTP немного запутана в этом вопросе, и в последний раз, когда я проверял Stack Overflow, были здоровые дебаты по этому вопросу. Вот почему мы используем HTTPoison — он позволяет использовать полезные данные в запросах DELETE.

@spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response}
def delete(pit) do
  url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options())

  with {:ok, payload} <- Poison.encode(%{"id" => pit}),
        %HTTPoison.Response{status_code: 200} <-
          HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do
    :ok
  else
    error -> {:error, error}
  end
end

Теперь, когда с PID разобрались, наша следующая задача – выяснить, как использовать их в наших запросах. Наш базовый запрос дополнен тремя дополнительными параметрами:

* размер 10 000 (максимально допустимый для эластичного поиска) * pit, хэш, содержащий %{id: , keep_alive: «1m»} * sort — хеш, содержащий %{_shard_doc: «desc»}

(Для сортировки вам нужно что-то указать, а _shard_doc встраивается в каждый поисковый индекс Elastic, что делает его удобным и переносимым.)

@spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t()
def initial(pit, query) do
  %{
    query: query,
    pit: %{id: pit, keep_alive: PIT.expiration()},
    size: ES.max_results(),
    sort: %{"_shard_doc" => "desc"}
  }
end

После того, как наш базовый запрос завершен, мы можем сосредоточиться на том, как сгенерировать последовательность запросов, которая будет извлекать все элементы, удовлетворяющие нашим критериям поиска. Хитрость здесь заключается в том, что мы передаем в следующий запрос значение наше поле сортировки от последнего совпадения с предыдущим результатом, например так:

@spec update(ES.query_t(), ES.response_t()) :: ES.query_t()
def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do
  Map.put(query, :search_after, List.last(hits)["sort"])
end

Вооружившись этими двумя функциями генерации запросов, мы можем написать Stream.iterator, который будет извлекать все результаты из нужного нам индекса:

@spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any}
def streamer(initial) do
  {:ok,
    initial
    |> search()
    |> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end)
   }
rescue
  error -> {:error, error}
end

Нам нужна высокоуровневая функция, которая берет результаты этой функции и создает поток обращений. Но есть нюанс — мы должны удалить PIT, когда закончим с потоком. Но как мы узнаем? Решение состоит в том, чтобы передать потребителя, который потребляет поток, а затем мы удаляем PIT после этого, например:

@spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()}
def stream_many(query, index, consumer, count) do
  case PIT.create(index) do
    {:ok, pit_id} ->
      try do
        case pit_id |> Query.initial(query) |> streamer() do
          {:ok, stream} ->
            stream
            |> Stream.flat_map(& &1.body["hits"]["hits"])
            |> Stream.take(count)
            |> then(consumer)
          error -> error
        end
      after
        PIT.delete(pit_id)
      end
    error -> error
  end
rescue
  error -> {:error, error}
end

Но если наш запрос возвращает менее 10 тысяч результатов, нам не нужен весь механизм PID/sort/search_after — достаточно одного запроса.

@spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream_one(query, index, consumer) do
  query
  |> search(index)
  |> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end)
end

Теперь нам нужна функция верхнего уровня, которая запрашивает размер результата и выбирает между stream_one или stream_many в зависимости от значения, например так:

@spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream(query, index, consumer) do
  case count(query, index) do
    {:ok, count} ->
      if count > ES.max_results() do
        stream_many(query, index, consumer, count)
      else
        stream_one(query, index, consumer)
      end
    error ->
      error
  end
end

АМС S3

Для доступа к Amazon S3 мы используем библиотеку ex_aws_s3. уровне для нас, но у него есть одно требование: входной поток данных должен состоять из фрагментов размером не менее 4 Мбайт.

Для этого мы используем функцию потока chunk_while. Это требует четырех входных данных:

  1. перечислимое, которое нужно разделить
  2. начальное значение аккумулятора
  3. шаговая функция
  4. функция финализатора

Наше первое решение: как должен выглядеть аккумулятор, который передается функцией step? Очевидно, он должен содержать список элементов в текущем чанке. Но что более тонко, он также должен содержать размер текущего фрагмента, чтобы мы не тратили ресурсы на его пересчет каждый раз. Таким образом, мы получаем двухэлементный кортеж, содержащий список и целое число.

Далее мы обратим внимание на функцию step. Он должен проверить, больше ли текущий размер желаемого размера фрагмента или равен ему. Если это так, мы должны взять список элементов из накопителя и преобразовать их в фрагмент с помощью convert; если нет, мы должны добавить его в текущий фрагмент (и обновить размер) с помощью add_chunk.

Что должен делать add_chunk? Просто поместите элемент в начало списка и увеличьте значение size на размер текущего фрагмента.

Поведение convert зависит от того, заботимся ли мы о том, чтобы порядок элементов в блоке сохранялся в выходных данных, потому что элементы в списке будут в обратном порядке, поэтому их необходимо изменить. Но если нам все равно, мы можем пропустить это преобразование. Собрав все вместе, мы получим:

@spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t()
def chunker(
        chunks,
        chunk_size,
        ordered  true,
        sizer  &String.length/1,
        joiner  &Enum.join/1
      ) do
  zero = {0, []}
  convert =
    if ordered do
      fn chunks -> chunks |> Enum.reverse() |> then(joiner) end
    else
      joiner
    end

  final = fn {_, chunks} -> {:cont, convert.(chunks), zero} end
  add_chunk = fn {size, chunks}, chunk -> {size + sizer.(chunk), [chunk | chunks]} end

  step = fn chunk, accum = {size, chunks} ->
    if size >= chunk_size do
      {:cont, convert.(chunks), add_chunk.(zero, chunk)}
    else
      {:cont, add_chunk.(accum, chunk)}
    end
  end

  Stream.chunk_while(chunks, zero, step, final)
end

Теперь мы можем объединить все написанные нами компоненты, чтобы создать простое действие контроллера, которое запрашивает выходной поток обращений из нашего раздела Elasticsearch в фрагменты по 4 М, которые удовлетворят требованиям модуля ExAWS.S3:< /p>

defmodule Controller do
  @four_megabytes 4*1024*1024

  def find_all(params) do
    query(params) |> ElasticSearch.stream(params.index, &consumer(params, &1))
  end

  def query(params) do
    %{term: %{params.query_term => params.query_value}}
  end

  def consumer(params, stream) do
    stream
    |> Stream.map(&Poison.encode!(&1["_source"]))
    |> chunker(@four_megabytes, false)
    |> S3.upload(params.bucket, params.filename)
    |> ExAws.request()
  end
end

На этом мы завершаем наше исследование потоков Elixir, Elasticsearch и AWS S3. Надеюсь, вам понравилось!

Гай Арго, штатный инженер Lob.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE