Визуализация обнаружения аномалий в реальном времени с помощью Python

Визуализация обнаружения аномалий в реальном времени с помощью Python

21 апреля 2023 г.

Открытие исходного кода Rerun в феврале стало важным шагом для тех, кто ищет доступный, но мощный библиотеки визуализации Python.

Почему важна визуализация?

Визуализация необходима, поскольку такие компании, как Scale.ai, Weights & Biases и Hugging Face упростили глубокое обучение, обратившись к маркировке наборов данных, отслеживанию экспериментов и предварительно обученным моделям. Однако в быстром сборе и визуализации данных по-прежнему существует пробел.

Многие компании разрабатывают собственные решения для визуализации данных, но часто в итоге получают неоптимальные инструменты из-за высоких затрат на разработку. Кроме того, визуализация Python для потоковых данных — проблема, которая также не решается должным образом, что приводит к решения на основе JavaScript в ноутбуках. Rerun использует интерфейс Python в высокопроизводительном механизме визуализации Rust ( очень похоже на Bytewax!), который упрощает анализ потоковых данных.

В этом сообщении блога мы рассмотрим, как использовать Bytwax и Rerun для визуализации потоковых данных в реальном времени в Python и создания визуализации обнаружения аномалий в реальном времени.

Мы выбрали обнаружение аномалий, также известное как обнаружение выбросов, потому что оно является важным компонентом во многих приложениях, таких как кибербезопасность, обнаружение мошенничества и мониторинг промышленных процессов. Визуализация этих аномалий в режиме реального времени может помочь быстро выявить потенциальные проблемы и принять необходимые меры для их устранения.

Тем, кто хочет погрузиться в работу, ознакомьтесь с нашим комплексным решением Python на нашем сайте . Гитхаб. Не забудьте отметить Bytwax звездочкой!

Обзор

Вот что мы рассмотрим:

  • Мы рассмотрим код и кратко обсудим объекты верхнего уровня.
  • Затем мы более подробно обсудим каждый этап потока данных: инициализация нашего потока данных, источник ввода, обнаружение аномалий с отслеживанием состояния, визуализация данных и т.д. вывод и как создать кластер
  • Наконец, мы научимся запускать его и увидим красивую визуализацию, все на Python <3
  • В качестве бонуса мы подумаем о других вариантах использования.

Поехали!

Настройте среду

Это сообщение в блоге основано на следующих версиях Bytewax и Rerun:

bytewax==0.15.1
rerun-sdk==0.4.0

Rerun и Bytewax устанавливаются как

pip install rerun-sdk
pip install bytewax

Следите за новостями Bytewax, поскольку мы готовим новую версию, которая еще больше упростит разработку приложений для потоковой передачи данных на Python.

Код

Решение относительно компактное, поэтому мы скопировали сюда весь пример кода. Пожалуйста, не стесняйтесь пропустить этот большой кусок, если он кажется чрезмерным; мы обсудим каждую функцию позже.

import random
# pip install rerun-sdk
import rerun as rr

from time import sleep
from datetime import datetime

from bytewax.dataflow import Dataflow
from bytewax.execution import spawn_cluster
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import ManualOutputConfig


rr.init("metrics")
rr.spawn()

start = datetime.now()

def generate_random_metrics(worker_index, worker_count, resume_state):
    assert resume_state is None
    keys = ["1", "2", "3", "4", "5", "6"]
    this_workers_keys = distribute(keys, worker_index, worker_count)

    for _ in range(1000):
        for key in this_workers_keys:
            value = random.randrange(0, 10)
            if random.random() > 0.9:
                value *= 2.0
            yield None, (key, (key, value, (datetime.now() - start).total_seconds()))
            sleep(random.random() / 10.0)


class ZTestDetector:
    """Anomaly detector.

    Use with a call to flow.stateful_map().

    Looks at how many standard deviations the current item is away
    from the mean (Z-score) of the last 10 items. Mark as anomalous if
    over the threshold specified.
    """

    def __init__(self, threshold_z):
        self.threshold_z = threshold_z

        self.last_10 = []
        self.mu = None
        self.sigma = None

    def _push(self, value):
        self.last_10.insert(0, value)
        del self.last_10[10:]

    def _recalc_stats(self):
        last_len = len(self.last_10)
        self.mu = sum(self.last_10) / last_len
        sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len
        self.sigma = sigma_sq**0.5

    def push(self, key__value__t):
        key, value, t = key__value__t
        is_anomalous = False
        if self.mu and self.sigma:
            is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z

        self._push(value)
        self._recalc_stats()
        rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155])
        if is_anomalous:
            rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100])
            rr.log_scalar(
                f"temp_{key}/data/anomaly",
                value,
                scattered=True,
                radius=3.0,
                color=[255, 100, 100],
            )
        else:
            rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)

        return self, (value, self.mu, self.sigma, is_anomalous)


def output_builder(worker_index, worker_count):
    def inspector(input):
        metric, (value, mu, sigma, is_anomalous) = input
        print(
            f"{metric}: "
            f"value = {value}, "
            f"mu = {mu:.2f}, "
            f"sigma = {sigma:.2f}, "
            f"{is_anomalous}"
        )

    return inspector


if __name__ == '__main__':
    flow = Dataflow()
    flow.input("input", ManualInputConfig(generate_random_metrics))
    # ("metric", value)
    flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
    # ("metric", (value, mu, sigma, is_anomalous))
    flow.capture(ManualOutputConfig(output_builder))
    spawn_cluster(flow)

Предоставленный код демонстрирует, как создать конвейер обнаружения аномалий в реальном времени с помощью Bytewax и Rerun.

Давайте разберем основные компоненты этого кода:

  • generate_random_metrics: эта функция генерирует случайные метрики, имитирующие реальные потоки данных. Он создает точки данных с небольшой вероятностью аномалии (значения удваиваются).

* ZTestDetector: этот класс реализует детектор аномалий, использующий метод Z-оценки. Он поддерживает среднее значение и стандартное отклонение последних 10 значений и помечает значение как аномальное, если его Z-показатель превышает указанный порог.

* output_builder: эта функция используется для определения поведения вывода для конвейера данных. В этом случае он печатает имя метрики, значение, среднее значение, стандартное отклонение и является ли значение аномальным.

* Поток данных: основная часть кода создает поток данных с использованием Bytewax, соединяя RandomMetricInput, ZTestDetector и построитель вывода.

* Визуализация повторного запуска: визуализация повторного запуска интегрирована в класс ZTestDetector. Функции rr.log_scalar и rr.log_point используются для построения точек данных и их соответствующего статуса аномалии.

Теперь, поняв основные компоненты кода, давайте обсудим, как шаг за шагом создается визуализация.

Построение потока данных

Чтобы создать конвейер потока данных, необходимо:

  1. Инициализируйте новый поток данных с помощью flow = Dataflow().
  2. Определите источник ввода с помощью flow.input("input", ManualInputConfig(generate_random_metrics)).
  3. Примените детектор аномалий с отслеживанием состояния с помощью flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push).
  4. Настройте поведение вывода с помощью flow.capture(ManualOutputConfig(output_builder)).
  5. Наконец, создайте кластер для выполнения потока данных с помощью spawn_cluster(flow, proc_count=3).

Результирующий поток данных считывает случайно сгенерированные значения метрик из input_builder, передает их через ZTestDetector для обнаружения аномалий и выводит результаты с помощью функции output_builder. . Давайте уточним детали для каждого шага.

Функция

generate_random_metrics

Функция generate_random_metrics служит альтернативным источником входных данных для конвейера потока данных, генерируя случайные значения метрик распределенным образом между несколькими рабочими процессами. Он принимает три параметра: worker_index, worker_count и resume_state.

def generate_random_metrics(worker_index, worker_count, resume_state):
    assert resume_state is None
    keys = ["1", "2", "3", "4", "5", "6"]
    this_workers_keys = distribute(keys, worker_index, worker_count)

    for _ in range(1000):
        for key in keys:
            value = random.randrange(0, 10)
            if random.random() > 0.9:
                value *= 2.0
            yield None, (key, (key, value, (datetime.now() - start).total_seconds()))
            sleep(random.random() / 10.0)

* worker_index: индекс текущего рабочего процесса в конвейере потока данных. * worker_count: общее количество рабочих процессов в конвейере потока данных. * resume_state: состояние источника ввода, из которого следует возобновить работу. В этом случае устанавливается значение None, указывающее, что источник ввода не поддерживает возобновление работы из предыдущего состояния.

Вот пошаговое описание generate_random_metrics функции:

  1. Подтвердите, что resume_state имеет значение None.
  2. Определите список ключей, представляющих показатели.
  3. Распределите ключи между рабочими процессами с помощью функции распределения (отсутствует во фрагменте кода). Распределенные ключи для текущего рабочего процесса назначаются this_workers_keys.
  4. Выполнить 1000 итераций и для каждой итерации выполнить итерацию по списку ключей:
  5. Создать случайное значение от 0 до 10.
  6. С вероятностью 10 % удвойте значение, чтобы имитировать аномалию.
  7. Возвратите кортеж, содержащий None (чтобы указать отсутствие конкретного ключа раздела), ключ, сгенерированное значение и время, прошедшее с момента запуска (не указано во фрагменте кода).
  8. Введите время ожидания между каждым сгенерированным значением, чтобы имитировать создание данных в реальном времени.

Функция generate_random_metrics используется в потоке данных в качестве источника ввода со следующей строкой кода:

flow.input("input", ManualInputConfig(generate_random_metrics))

Эта строка указывает потоку данных использовать класс RandomMetricInput для создания входных данных для конвейера.

ZTestDetector Класс

Класс ZTestDetector — это детектор аномалий, использующий метод Z-оценки для определения того, является ли точка данных аномальной или нет. Z-оценка — это количество стандартных отклонений точки данных от среднего значения набора данных. Если Z-оценка точки данных выше указанного порога, она считается аномальной.

У класса есть следующие методы:

  • __init__(self, threshold_z): конструктор инициализирует ZTestDetector пороговым значением Z-оценки. Он также инициализирует список последних 10 значений (self.last_10), среднее значение (self.mu) и стандартное отклонение (self.sigma).

* _push(self, value): этот закрытый метод используется для обновления списка последних 10 значений новым значением. Он вставляет новое значение в начало списка и удаляет самое старое значение, сохраняя длину списка равной 10.

* _recalc_stats(self): этот частный метод пересчитывает среднее значение и стандартное отклонение на основе текущих значений в списке self.last_10.

* push(self, key__value__t): этот общедоступный метод принимает в качестве входных данных кортеж, содержащий ключ, значение и отметку времени. Он вычисляет Z-показатель для значения, обновляет список последних 10 значений и пересчитывает среднее значение и стандартное отклонение. Он также регистрирует точку данных и ее аномальный статус, используя функции визуализации Rerun. Наконец, он возвращает обновленный экземпляр класса ZTestDetector и кортеж, содержащий значение, среднее значение, стандартное отклонение и статус аномалии.

Класс ZTestDetector используется в конвейере потока данных как карта с отслеживанием состояния со следующим кодом:

flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)

Эта строка указывает потоку данных применить ZTestDetector с порогом Z-показателя 2,0 и использовать метод push для обработки точек данных.< /p>

Визуализация аномалий

Чтобы визуализировать аномалии, класс ZTestDetector регистрирует точки данных и соответствующий им статус аномалии, используя функции визуализации Rerun. В частности, rr.log_scalar используется для построения скалярного значения, а rr.log_point — для построения трехмерных точек.

В следующем фрагменте кода показано, как создается визуализация:

rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155])
if is_anomalous:
    rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100])
    rr.log_scalar(
        f"temp_{key}/data/anomaly",
        value,
        scattered=True,
        radius=3.0,
        color=[255, 100, 100],
    )
else:
    rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)

Здесь мы сначала регистрируем скалярное значение, представляющее метрику. Затем, в зависимости от того, является ли значение аномальным, мы регистрируем 3D-точку с другим радиусом и цветом. Аномальные точки регистрируются красным цветом с большим радиусом, а неаномальные точки регистрируются с меньшим радиусом.

output_builder Функция

Функция output_builder используется для определения поведения вывода для конвейера данных. В этом конкретном примере он отвечает за печать имени метрики, значения, среднего значения, стандартного отклонения и того, является ли значение аномальным.

Функция принимает два аргумента: worker_index и worker_count. Эти аргументы помогают функции понять индекс рабочего процесса и общее количество рабочих процессов в конвейере потока данных.

Вот определение output_builder функции:

def output_builder(worker_index, worker_count):
    def inspector(input):
        metric, (value, mu, sigma, is_anomalous) = input
        print(
            f"{metric}: "
            f"value = {value}, "
            f"mu = {mu:.2f}, "
            f"sigma = {sigma:.2f}, "
            f"{is_anomalous}"
        )

    return inspector

Эта функция является функцией более высокого порядка, что означает, что она возвращает другую функцию с именем spector. Функция spector отвечает за обработку кортежа входных данных и печать желаемого вывода.

Функция построителя вывода позже используется в конвейере потока данных при настройке поведения вывода с помощью

flow.capture(ManualOutputConfig(output_builder)).

Запуск потока данных

Bytewax может работать как один процесс или как несколько процессов. Этот поток данных был создан для масштабирования нескольких процессов, но мы начнем запускать его как единый процесс с исполнительным модулем spawn_cluster.

spawn_cluster(flow)

Если бы мы хотели увеличить параллелизм, мы бы просто добавили больше процессов в качестве аргументов.

Например - spawn_cluster(flow, proc_count=3).

Чтобы запустить предоставленный код, мы можем просто запустить его как скрипт Python, но сначала нам нужно установить зависимости.

Создайте новый файл в том же каталоге, что и dataflow.py, и назовите его requirements.txt.

Добавьте следующее содержимое в файл requirements.txt:

bytewax==0.15.1
rerun-sdk==0.4.0

Откройте терминал в каталоге, содержащем файлы requirements.txt и dataflow.py.

Установите зависимости с помощью следующей команды:

pip install -r requirements.txt

И запустите поток данных!

python dataflow.py

Расширение варианта использования

Предоставленный код служит базовым примером обнаружения аномалий в реальном времени, но вы можете расширить этот конвейер, чтобы учесть более сложные сценарии.

Например:

  • Включение реальных источников данных. Замените класс RandomMetricInput настраиваемым классом, который считывает данные из реального источника, например датчиков Интернета вещей, файлов журналов или потоковых API.

* Внедрение более сложных методов обнаружения аномалий. Вы можете заменить класс ZTestDetector другими методами обнаружения аномалий с отслеживанием состояния, такими как скользящее среднее значение, экспоненциальное сглаживание или подходы на основе машинного обучения.

* Настройка визуализации: улучшите визуализацию повторного запуска, добавив больше измерений данных, настроив цветовые схемы или изменив стили графика в соответствии со своими потребностями.

* Интеграция с системами оповещения и мониторинга: вместо того, чтобы просто распечатывать результаты обнаружения аномалий, вы можете интегрировать конвейер с системами оповещения или мониторинга, чтобы уведомлять соответствующие заинтересованные стороны при обнаружении аномалии.

Настраивая и расширяя конвейер потока данных, вы можете создать мощное решение для обнаружения и визуализации аномалий в реальном времени, адаптированное к вашему конкретному варианту использования. Сочетание Bytwax и Rerun предлагает универсальную и масштабируемую основу для создания систем обработки и визуализации данных в реальном времени.

Заключение

В этом сообщении блога показано, как использовать Bytewax и Rerun для создания визуализации обнаружения аномалий в реальном времени. Создав конвейер потока данных с помощью Bytewax и интегрировав мощные возможности визуализации Rerun, мы можем отслеживать и выявлять аномалии в наших данных по мере их возникновения.


:::информация Первоначально написано Зандером Мэтисоном здесь

:::

Присоединяйтесь к нашему сообществу: Slack Github


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE