Сбор данных об изменениях в PostgreSQL и пример кода Golang
11 мая 2022 г.Архитектура современных веб-приложений состоит из нескольких программных компонентов, таких как информационные панели, аналитика, базы данных, озера данных, кэши, поиск и т. д.
База данных обычно является основной частью любого приложения. Обновления данных в режиме реального времени обеспечивают непрерывную синхронизацию разрозненных систем данных и быстро реагируют на новую информацию. Так как же синхронизировать экосистему приложений? Как эти другие компоненты получают информацию об изменениях в базе данных? Change Data Capture или CDC относится к любому решению, которое идентифицирует новые или измененные данные.
Это сообщение о PostgreSQL CDC и способах достижения этого.
Сбор данных об изменениях (CDC) — это подход к интеграции данных для обнаружения, сбора и доставки изменений, внесенных в источники данных базы данных.
В целом интеграция данных на основе CDC состоит из следующих шагов:
- Зафиксируйте данные об изменениях в исходной базе данных.
- Преобразуйте измененные данные в формат, приемлемый для ваших потребителей.
- Опубликуйте данные для потребителей или целевой базы данных.
PostgreSQL предлагает два встроенных способа сделать CDC возможным:
- Из журналов транзакций PostgreSQL WAL, также известных как журналы опережающей записи.
- Триггеры базы данных.
Давайте кратко обсудим плюсы и минусы использования журналов транзакций (WAL) и триггеров для регистрации изменений данных.
Триггеры.
Методы на основе триггеров включают создание триггеров аудита в базе данных для захвата всех событий, связанных с методами INSERT, UPDATE и DELETE.
Триггеры могут быть прикреплены к таблицам (разделенным или нет) или представлениям.
Триггеры также могут срабатывать для операторов TRUNCATE. Если происходит событие триггера, функция триггера вызывается в соответствующее время для обработки события.
- 😄 Самое главное преимущество этого метода в том, что все это можно сделать на уровне SQL, в отличие от журналов транзакций.
- 😕 Однако использование триггеров оказывает существенное влияние на производительность исходной базы данных, поскольку эти триггеры необходимо запускать в базе данных приложения при внесении изменений в данные.
Журналы транзакций
С другой стороны, для современных СУБД журналы транзакций (WAL для PostgreSQL) обычно используются для регистрации транзакций и репликации.
В PostgreSQL все транзакции, такие как INSERT, UPDATE, DELETE, записываются в WAL до того, как клиент получит результат транзакции.
- Преимущество такого подхода в том, что это никоим образом не влияет на производительность базы данных.
- Это также не требует модификации таблиц БД или приложения. Нет необходимости создавать дополнительные таблицы в исходной базе данных.
- CDC на основе журналов обычно считается лучшим подходом к сбору измененных данных, применимым ко всем возможным сценариям, включая системы с чрезвычайно большими объемами транзакций.
Обратите внимание, что в настоящее время большинство операторов DDL, таких как CREATE, DROP, ALTER, не отслеживаются. Однако команда TRUNCATE находится в логическом потоке репликации.
Если вам нужна построчная потоковая передача изменений данных Postgres по мере их возникновения, вам потребуется функция логического декодирования или логической репликации Postgres.
Использование логического декодирования Postgres.
[Логическое декодирование] (https://www.postgresql.org/docs/current/logicaldecoding-example.html) — это официальное название основанного на журналах CDC (логическая репликация) PostgreSQL.
Логическое декодирование использует содержимое журнала упреждающей записи PostgreSQL для хранения всех действий, происходящих в базе данных. Write Ahead Log — это внутренний журнал, который описывает изменения базы данных на уровне хранилища.
1. Первым шагом в использовании логического декодирования является установка следующих параметров в конфигурации Postgres «postgresql.conf».
wal_level = логический
max_replication_slots = 5
max_wal_senders = 10
- Установка для
wal_level
значенияlogical
позволяет WAL записывать информацию, необходимую для логического декодирования.
- Убедитесь, что ваше значение
max_replication_slots
равно или превышает количество коннекторов PostgreSQL, использующих WAL плюс количество других слотов репликации, которые использует ваша база данных.
- Убедитесь, что параметр max_wal_senders, определяющий максимальное количество одновременных подключений к WAL, как минимум в два раза превышает количество слотов логической репликации. Например, если ваша база данных использует всего 5 слотов репликации, значение
max_wal_senders
должно быть 10 или больше.
Перезапустите сервер Postgres, чтобы изменения вступили в силу.
2. Второй шаг — настроить логическую репликацию с помощью выходного плагина test_decoding
.
Создайте слот логической репликации для базы данных, которую вы хотите синхронизировать, выполнив следующую команду.
SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');
Примечание. Каждая репликация слот имеет имя, которое может содержать строчные буквы, цифры и символ подчеркивания.
Чтобы убедиться, что слот успешно создан, выполните следующую команду.
ВЫБЕРИТЕ имя_слота, подключаемый модуль, тип_слота, базу данных, активный, номер_стартапа, номер_подтвержденного_флэша ИЗ pg_replication_slots;
3. На следующем шаге создайте публикацию для всех ваших таблиц или только для определенных. Если вы укажете таблицы, вы добавите или удалите таблицы из публикации позже.
СОЗДАТЬ ПУБЛИКАЦИЯ ДЛЯ ВСЕХ ТАБЛИЦ;
или
CREATE PUBLICATION pub FOR TABLE table1, table2, table3;
При желании вы можете выбрать, какие операции включить в публикацию. Например, следующая публикация включает только операции INSERT и UPDATE для table1
.
СОЗДАТЬ ПУБЛИКАЦИЯ insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');
4. Убедитесь, что выбранные вами таблицы есть в публикации.
psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Выход
имя паба | имя схемы | имя таблицы
паб | общественный | Таблица 1
паб | общественный | Таблица 2
паб | общественный | таблица3
(3 ряда)
С тех пор наша публикация pub
отслеживает все изменения таблиц в базе данных psql-stream
.
5. Давайте создадим абстрактную таблицу t
и заполним ее некоторыми записями.
создать таблицу t (id int, текст имени);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name) ;
В результате имеем 10 записей в таблице t
.
psql-stream=# SELECT count(*) FROM t;
считать
(1 ряд)
6. Наконец, пришло время проверить, работает ли наша логическая репликация.
Выполните следующую команду в консоли PostgreSQL, чтобы увидеть записи Postgres WAL.
SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);
В результате вы получите что-то вроде:
номер | сид | данные
0/19EA2C0 | 1045 | НАЧАЛО 1045
0/19EA2C0 | 1045 | таблица public.t: INSERT: id [целое число]: 1 имя [текст]: 51459cbc211647e7b31c8720
0/19EA300 | 1045 | таблица public.t: INSERT: id [целое число]: 2 имя [текст]: 51459cbc211647e7b31c8720
0/19EA340 | 1045 | таблица public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
0/19EA380 | 1045 | таблица public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
0/19EA3C0 | 1045 | таблица public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
0/19EA400 | 1045 | таблица public.t: INSERT: id [целое число]: 6 имя [текст]: 51459cbc211647e7b31c8720
0/19EA440 | 1045 | таблица public.t: INSERT: id [целое число]: 7 имя [текст]: 51459cbc211647e7b31c8720
0/19EA480 | 1045 | таблица public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
0/19EA4C0 | 1045 | таблица public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
0/19EA500 | 1045 | таблица public.t: INSERT: id [целое число]: 10 имя [текст]: 51459cbc211647e7b31c8720
0/19EA5B0 | 1045 | СОВЕРШИТЬ 1045
(13 рядов)
pg_logical_slot_peek_changes
— еще одна команда PostgreSQL для просмотра изменений из записей WAL без их использования. Таким образом, многократный вызов pg_logical_slot_peek_changes
каждый раз возвращает один и тот же результат.
С другой стороны, pg_logical_slot_get_changes возвращает результаты только в первый раз. Следующие вызовы pg_logical_slot_get_changes
возвращают пустые наборы результатов. Это означает, что когда выполняется команда get
, результаты обслуживаются и удаляются, что значительно расширяет наши возможности по написанию логики для использования этих событий для создания реплики таблицы.
7. Не забудьте уничтожить слот, который вам больше не нужен, чтобы остановить его потребление
ВЫБРАТЬ pg_drop_replication_slot('replication_slot');
Плагины вывода.
Мы уже говорили о плагине вывода test_decoding
, доступном в Postgres 9.4+. Несмотря на то, что он создан как пример подключаемого модуля вывода, он все же полезен, если ваш потребитель его поддерживает.
Наряду с плагином test_decoding
, еще один плагин pgoutput
изначально поставляется с PostgreSQL. pgoutput
доступен, начиная с Postgres 10. Некоторые потребители поддерживают его для декодирования (например, Debezium).
Выполните следующую команду, чтобы создать плагин на основе pgoutput
, как в шаге 2 выше.
SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');
Следующая команда использует изменения данных, аналогичные описанным в шаге 6.
psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
номер | сид | данные
0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
0/19A1890 | 1038 | \x49000080384e0002740000000234316e
0/19A1910 | 1038 | \x49000080384e0002740000000234326e
0/19A1990 | 1038 | \x49000080384e0002740000000234336e
0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 рядов)
Здесь вы можете заметить, что результаты возвращаются в двоичном формате. Плагин pgoutput
производит двоичный вывод.
wal2json — еще один популярный плагин вывода для логического декодирования.
Вот пример вывода плагина wal2json.
"сдача":[
"вид": "вставить",
"схема": "общедоступный",
"таблица": "т",
"имена столбцов": [
"я бы",
"название"
"типы столбцов": [
"целое",
"переменный характер(255)"
"значения столбца": [
1,
"сдача":[
"вид": "обновление",
"схема": "общедоступный",
"таблица": "т",
"имена столбцов": [
"я бы",
"название"
"типы столбцов": [
"целое",
"переменный характер(255)"
"значения столбца": [
1,
«Новая ценность»
"старые ключи": {
"ключевые имена": [
"я бы"
"типы ключей": [
"целое число"
"ключевые значения": [
1
"сдача":[
"вид":"удалить",
"схема": "общедоступный",
"таблица": "т",
"старые ключи": {
"ключевые имена": [
"я бы"
"типы ключей": [
"целое число"
"ключевые значения": [
1
Важные советы по слотам.
При работе со слотами помните следующее:
- Каждый слот имеет только один выходной плагин (какой именно вы выбираете сами).
- Каждый слот предоставляет изменения только из одной базы данных.
- Одна база данных может иметь несколько слотов.
- Каждое изменение данных обычно передается один раз на слот.
- Но слот может повторно передать изменения при перезапуске экземпляра Postgres. Потребитель должен разобраться в этой ситуации.
- Неиспользованный слот угрожает доступности вашего экземпляра Postgres. Postgres сохранит все файлы WAL для этих неиспользованных изменений. Это может привести к переполнению хранилища.
Потребители PostgreSQL WAL.
Потребитель — это любое приложение, которое может принимать поток логического декодирования Postgres. pg_recvlogical — это приложение PostgreSQL, которое может управлять слотами и потреблять из них поток. Он включен в дистрибутив Postgres, поэтому, вероятно, он уже установлен вместе с PostgreSQL.
![Двоичный код атаки хакера. Сделано с использованием Canon 5d Mark III и аналогового винтажного объектива Leica APO Macro Elmarit-R 2.8 100mm (Год: 1993)] -cl30qa9w400070as6cslg41me)
Фото Маркус Списке / Unsplash
Пример кода Golang.
В следующем примере кода Golang показано, как приступить к созданию собственного потребителя Postgress WAL. Он использует логическую репликацию PostgreSQL-10.x для потоковой передачи изменений базы данных (декодированных сообщений WAL) из исходной базы данных.
основной пакет
импорт (
"контекст"
"ФМТ"
"Операционные системы"
"ОС/сигнал"
"струны"
"время"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgproto3/v2"
// Обратите внимание, что параметр времени выполнения "replication=database" в строке подключения обязателен
// слот репликации не будет создан, если опущено replication=database
const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
константа OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "создать таблицу t (id int, name text);"
переменная событие = структура {
Строка отношения
Столбцы [] строка
основная функция () {
ctx, отмена := signal.NotifyContext(context.Background(), os.Interrupt)
отложить отмену()
соединение, ошибка: = pgconn.Connect (ctx, CONN)
если ошибка != ноль {
паника (ошибка)
отложить соединение. Закрыть (ctx)
// 1. Создаем таблицу
если _, ошибка := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); ошибка != ноль {
fmt.Errorf("не удалось создать таблицу: %v", ошибка)
// 2. убедиться, что публикация существует
if _, err := conn.Exec(ctx, "УДАЛИТЬ ПУБЛИКАЦИЯ, ЕСЛИ ПУБЛИКАЦИЯ СУЩЕСТВУЕТ;").ReadAll(); ошибка != ноль {
fmt.Errorf("не удалось удалить публикацию: %v", ошибка)
if _, err := conn.Exec(ctx, "СОЗДАТЬ ПУБЛИКАЦИЯ ДЛЯ ВСЕХ ТАБЛИЦ;").ReadAll(); ошибка != ноль {
fmt.Errorf("не удалось создать публикацию: %v", ошибка)
// 3. создаем временный слот-сервер репликации
если _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); ошибка != ноль {
fmt.Errorf("не удалось создать слот репликации: %v", ошибка)
var msgPointer pglogrepl.LSN
pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}
// 4. установить соединение
err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
если ошибка != ноль {
fmt.Errorf("не удалось установить запуск репликации: %v", ошибка)
var pingTime время.Время
для ctx.Err() != context.Canceled {
если time.Now().After(pingTime) {
если err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); ошибка != ноль {
fmt.Errorf("не удалось отправить резервное обновление: %v", ошибка)
pingTime = время.Сейчас().Добавить(10 * время.Секунда)
//fmt.Println("Клиент: подождите")
ctx, отмена := context.WithTimeout(ctx, время.Секунда*10)
отложить отмену()
сообщение, ошибка := conn.ReceiveMessage(ctx)
если pgconn.Timeout(ошибка) {
Продолжать
если ошибка != ноль {
fmt.Errorf("что-то пошло не так при прослушивании сообщения: %v", ошибка)
переключатель msg := msg.(type) {
случай *pgproto3.CopyData:
переключатель msg.Data[0] {
случай pglogrepl.PrimaryKeepaliveMessageByteID:
// fmt.Println("сервер: подтвержден режим ожидания")
случай pglogrepl.XLogDataByteID:
walLog, ошибка := pglogrepl.ParseXLogData(msg.Data[1:])
если ошибка != ноль {
fmt.Errorf("не удалось проанализировать логический журнал WAL: %v", ошибка)
var msg pglogrepl.Message
если сообщение, ошибка = pglogrepl.Parse(walLog.WALData); ошибка != ноль {
fmt.Errorf("не удалось проанализировать сообщение логической репликации: %v", ошибка)
переключатель m := сообщение.(тип) {
case *pglogrepl.RelationMessage:
Событие.Столбцы = []строка{}
для _, столбец: = диапазон m.Columns {
Event.Columns = добавить (Event.Columns, col.Name)
Событие.Отношение = m.RelationName
case *pglogrepl.InsertMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("ВСТАВИТЬ %s(", Event.Relation))
для я := 0; я < len(Событие.Столбцы); я++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.UpdateMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("ОБНОВЛЕНИЕ %s(", Event.Relation))
для я := 0; я < len(Событие.Столбцы); я++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.DeleteMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("УДАЛИТЬ %s(", Event.Relation))
для я := 0; я < len(Событие.Столбцы); я++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.TruncateMessage:
fmt.Println("ВСЕ ПРОШЛО (ОБРЕЗАНО)")
По умолчанию:
fmt.Printf("получено неожиданное сообщение: %T", msg)
Этот код просто регистрирует входящие события, но в производственной среде вы можете легко отправить их в очередь сообщений или целевую базу данных.
Вывод
Логическое декодирование в PostgreSQL предоставляет другим компонентам приложения эффективный способ оставаться в курсе изменений данных в вашей базе данных Postgres.
Традиционно использовалась модель уведомления по запросу, в которой каждый компонент приложения опрашивает Postgres с определенным интервалом. Логическое кодирование использует модель push-уведомлений, когда Postgres уведомляет другие части приложения о каждом изменении, как только оно происходит.
События изменения данных теперь можно отправлять потребителям за миллисекунды без запроса к базе данных. Благодаря логическому декодированию база данных PostgreSQL становится центральной частью вашего современного динамического приложения реального времени.
Оригинал