Мы потеряли миллионы записей: вот как я его поймал, исправил и построил систему мониторинга в одном

Мы потеряли миллионы записей: вот как я его поймал, исправил и построил систему мониторинга в одном

11 июня 2025 г.

Когда вы инженер по поддержке, аналитик и пожарный в одном человеке.

Контекст: большие данные, реальные клиенты, реальные проблемы

Я работаю в команде Data Platform, поддерживающей клиентов. Наш стек включает в себяСнежинкаВBigQuery,и продукт разоблачаетмиллионы точек данных ежедневно: Метрики клиентов, аналитика на уровне магазина, исторические тенденции и т. Д.

Итак, когда кто -то сказал,«Почему моя панель пуста?»Это была не просто ошибка пользовательского интерфейса. Что -то было сломано глубоко в трубопроводе данных.

Спойлер:Целые куски данных никогда не попадали в нашу платформуПолем

Шаг 1: Исследование черной дыры

Первая подсказка пришла от команды доставки продукта: в некоторых учетных записях были неполные наборы данных. Мы быстро проследили поток:

  • Необработанные данные введеныMysql
  • Он должен был перейти кGoogle Cloud Storage (GCS)
  • Но не попал вСнежинка

Нет тревоги. Нет оповещений. Просто ... безмолв.

Сигнал в передаче GCS → Snowflake молча пропускал записи из -за неэлементной схемы и отсутствия проверки.

Шаг 2: Создайте систему мониторинга (через неделю)

Я решил построить легкий, но мощныйМониторинг трубопровода с использованием Python и SnowflakeПолем Не ожидание долгосрочного исправления продукта, потому что это должно было работатьсейчасПолем

Шаг 2.1: Подключитесь к снежинкам через Python

#a connector
import snowflake.connector
import pandas as pd

conn = snowflake.connector.connect(
    user='user',
    password='password',
    account='account_id',
    warehouse='warehouse',
    database='db',
    schema='schema'
)
#just an example
query = """
SELECT customer_id, module, max(event_date) as last_date
FROM analytics_table
GROUP BY customer_id, module
"""
df = pd.read_sql(query, conn)

Шаг 2.2: Обнаружение задержек с помощью панд

df["lag_days"] = (pd.Timestamp.today() - pd.to_datetime(df["last_date"])).dt.days
df["status"] = df["lag_days"].apply(lambda x: "valid" if x <= 1 else "delayed")

Теперь у меня было четкое представление о том, какие модули были свежими, а какие отставали на недели.

Шаг 2.3: храните результаты в новой таблице мониторинга

from sqlalchemy import create_engine
#example
engine = create_engine(
    'snowflake://user:password@account/db/schema?warehouse=warehouse'
)

df.to_sql("monitoring_status", con=engine, if_exists="replace", index=False)

Шаг 3: Покажите боль визуально

С использованиемLooker StudioЯ подключился прямо к таблице мониторинга.

Я построил приборную панель, которая показала:

  • % модулей, задержанных на одного клиента
  • Время с момента последнего успешного синхронизации
  • Новые проблемы, когда они появились

Это сделало лидерствовидетьРазрыв и распределяет ресурсы.

Шаг 4: Исправьте основную причину

Тем временем мы установили сломанный анализатор и перезапустили все затронутые трубопроводы. Мы также задним числом обработали все потерянные данные. Благодаря мониторингу, он подтвердил свой полный.

Результаты

  • Восстановлен100% потерянных данных
  • Создал аВ реальном времени, всегда мониторинг мониторинга
  • Снижение будущих слепых пятен за счет автоматической проверки
  • ПредотвращеноКлиентские эскалацииеще до того, как они заметили

Последние мысли

Иногда все, что вам нужно, - это интеллектуальный инженер поддержки, сценарий Python и четкое представление о том, что не так.

Если на вашей платформе данных есть миллионы движущихся частей - предположим, что что -то сломано. Тогда иди построить что -нибудь, чтобы поймать это.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE