Как передавать данные из MySQL в Postgres
29 января 2023 г.Потоковая передача данных из базы данных MySQL в базу данных PostgreSQL может быть полезным способом перемещения данных между системами или создания реплики базы данных в реальном времени для составления отчетов и анализа. Один из способов добиться этого — использовать Инструменты Change Data Capture (CDC).
CDC — это метод отслеживания изменений, внесенных в базу данных, и записи их в отдельный поток. Затем этот поток можно использовать для репликации изменений в другую базу данных.
DBConvert Streams помогает реплицировать ваши данные MySQL в PostgreSQL в режиме реального времени. Он фиксирует изменения данных из исходной базы данных MySQL и применяет их к целевой базе данных PostgreSQL. Это можно сделать, настроив источник для чтения двоичного журнала базы данных MySQL и преобразования изменений в формат, который может быть использован целевой базой данных PostgreSQL.
[Примеры и конфигурации потоковой передачи базы данных, репозиторий GitHub](https://github.com/slotix/dbconvert-streams-public)
Для начала давайте клонируем репозиторий GitHub, содержащий пример потоковой передачи MySQL в PostgreSQL.
git clone git@github.com:slotix/dbconvert-streams-public.git && cd dbconvert-streams-public/examples/mysql2postgres/sales-db/
Конфигурация Docker Compose.
Поскольку DBConvert Streams использует несколько служб, наиболее эффективным способом запуска контейнеров является использование Docker Compose. р>
Файл docker-compose.yml из репозитория приведен ниже.
version: '3.9'
services:
dbs-api:
container_name: api
image: slotix/dbs-api
entrypoint:
- ./dbs-api
- --nats=nats:4222
- --source=source-reader:8021
- --target=target-writer:8022
ports:
- 8020:8020
depends_on:
- nats
volumes:
- ./mysql2pg.json:/mysql2pg.json:ro
dbs-source-reader:
container_name: source-reader
image: slotix/dbs-source-reader
entrypoint:
- ./dbs-source-reader
- --nats=nats:4222
ports:
- 8021:8021
depends_on:
- dbs-api
dbs-target-writer:
container_name: target-writer
image: slotix/dbs-target-writer
entrypoint:
- ./dbs-target-writer
- --nats=nats:4222
- --prometheus=http://prometheus:9090
ports:
- 8022:8022
depends_on:
- dbs-source-reader
nats:
container_name: nats
image: nats
entrypoint: /nats-server
command: "--jetstream -m 8222 --store_dir /data/nats-server"
ports:
- 4222:4222
- 8222:8222
prometheus:
image: slotix/dbs-prometheus:latest
container_name: prom
user: root
ports:
- 9090:9090
mysql-source:
container_name: mysql-source
build: ./source
environment:
- MYSQL_ROOT_PASSWORD=123456
ports:
- '3306:3306'
postgres-target:
container_name: postgres-target
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command: postgres
ports:
- '5432:5432'
Файл Docker-compose для потоков DBConvert
Этот файл docker-compose будет вызывать несколько сервисов и связывать их вместе, чтобы они могли взаимодействовать друг с другом. При такой настройке процесс репликации происходит между базами данных mysql-source
и postgres-target
и контролируется dbs-api
, < code>dbs-source-reader, dbs-target-writer
и службы nats
.
Сервисы DBConvert Streams.
-
Служба
dbs-api
является точкой входа в DBConvert Streams. Сюда отправляются запросы с параметрами конфигурации для исходной и целевой баз данных. В нем указаны сведения о подключении к другим компонентам системы, таким какsource-reader
иtarget-write
.
Служба dbs-source-reader
отвечает за мониторинг и регистрацию изменений в исходной базе данных, а затем за отправку пакетов записей в концентратор событий.
Служба dbs-target-writer
используется для получения изменений из концентратора событий и их применения к целевой базе данных.
Служба nats
является ядром концентратора событий и обеспечивает связь между другими службами DBS.
Служба prometheus
используется для мониторинга метрик служб DBS.
Службы базы данных.
Структура таблиц, которые будут использоваться в нашем примере, показана на диаграмме ниже.
Структура исходных таблиц MySQL взята из jdaarevalo/docker_postgres_with_data репозитория GitHub.
Исходная и целевая базы данных.
Образ базы данныхmysql-source
основан на slotix/dbs-mysql:8
, который включает в себя все необходимые настройки для включения репликации MySQL CDC. Этот образ также содержит initdb.sql
script, который создает таблицы со структурами, показанными выше.
База данных postgres-target
, с другой стороны, основана на официальном облегченном образе postgres:15-alpine
. База данных postgres-target
будет получать все изменения, внесенные в базу данных mysql-source
.
<цитата>
Обе эти базы данных, mysql-source
и postgres-target
, обычно располагаются на отдельных физических серверах в производственной среде. Однако в этом примере мы запустим их на одном компьютере в разных контейнерах в демонстрационных целях.
Выполнение.
Чтобы запустить все службы, описанные выше, выполните следующую команду:
docker-compose up --build -d
Эта команда будет использовать файл docker-compose.yml
для сборки и запуска необходимых контейнеров в автономном режиме (опция -d). Флаг --build
инициирует перестроение образов перед запуском контейнеров.
Чтобы проверить, есть ли в базе данных MySQL внутри работающего контейнера все таблицы, успешно созданные сценарием при запуске, вы можете выполнить команду:
docker exec -it mysql-source mysql -uroot -p123456 source -e 'SHOW TABLES;'
Выполнив приведенную выше команду, вы увидите список таблиц, созданных в базе данных source
внутри контейнера, что подтвердит, успешно ли скрипт создал все таблицы при запуске.
+------------------+
| Tables_in_source |
+------------------+
| city |
| country |
| order_status |
| product |
| sale |
| status_name |
| store |
| users |
+------------------+
Список таблиц, созданных в исходном коде
Конфигурация трансляции
Это файл конфигурации потока mysql2pg.json
, который используется для настройки процесса репликации базы данных.
{
"source": {
"type": "mysql",
"connection": "root:123456@tcp(mysql-source:3306)/source",
"filter": {
"tables": [
{ "name": "product", "operations": ["insert"]},
{ "name": "country", "operations": ["insert"]},
{ "name": "city", "operations": ["insert"]},
{ "name": "store", "operations": ["insert"]},
{ "name": "users", "operations": ["insert"]},
{ "name": "status_name", "operations": ["insert"]},
{ "name": "sale", "operations": ["insert"]},
{ "name": "order_status", "operations": ["insert"]}
]
}
},
"target": {
"type": "postgresql",
"connection": "postgres://postgres:postgres@postgres-target:5432/postgres"
},
"limits": {
"numberOfEvents": 0,
"elapsedTime": 0
}
}
* Поле source
указывает тип исходной базы данных как "mysql" и сведения о соединении для подключения к базе данных, включая имя пользователя и пароль, а также хост и порт. .
* Поле filter
в поле источника указывает, что будут реплицированы только определенные таблицы и соответствующие им операции (в данном случае "insert").
* Поле target
указывает тип целевой базы данных как "postgresql" и сведения о соединении для подключения к целевой базе данных, включая имя пользователя и пароль, а также хост и порт.
* Поле limits
указывает, что количество событий и прошедшее время равны нулю (0), что означает отсутствие ограничений для этого процесса репликации.
Также стоит отметить, что в соответствии с этим конфигом будут фиксироваться только операции вставки в таблицы, указанные в поле фильтра.
Отправить конфигурацию в DBConvert Streams API.
docker run -t --rm
--network sales-db_default
curlimages/curl
--request POST
--url http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json
Эта команда запускает команду docker run
для запуска нового контейнера из образа curlimages/curl
. Он указывает, что контейнер должен присоединиться к сети с именем sales-db_default
с помощью параметра --network
.
Затем этот контейнер выполнит команду curl
, чтобы сделать HTTP-запрос POST к URL-адресу http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json. код>. URL-адрес содержит конечную точку, которая является службой dbs-api, которая работает на порту 8020 и ожидает файл json в качестве параметра запроса. Эта команда создает новый поток в службе dbs-api с конфигурацией, указанной в файле
mysql2pg.json
.
Важно отметить, что эта команда предполагает, что сеть sales-db_default
и служба dbs-api
уже созданы и запущены. Также предполагается, что файл mysql2pg.json
находится в текущем рабочем каталоге, из которого запускается команда.
Это ответ JSON, указывающий на успешное создание потока.
{"status":"success",
"data":{
"id":"2KGlt8BCHLT0lXklrs5wqM6n7BQ",
"source":{...},
"target":{...},
"limits":{}
}
}
Он содержит следующие поля:
* статус
: в этом поле указывается статус запроса, в данном случае "успешно"
* data
: это поле содержит сведения о созданном потоке.
* id
: это поле содержит уникальный идентификатор потока, в данном случае «2KGlt8BCHLT0lXklrs5wqM6n7BQ».
* source
: это поле содержит сведения об исходной базе данных, включая тип, сведения о соединении и параметры фильтра.
* target
: это поле содержит сведения о целевой базе данных, включая сведения о типе и соединении.
* limits
: это поле содержит ограничения для процесса репликации, такие как количество событий и истекшее время.
Обратите внимание, что сведения об исходном и целевом полях не приводятся здесь для краткости, они просто показаны как ...
Проверьте, успешно ли созданы таблицы в целевом объекте.
<цитата>💡DBConvert Streams создает таблицы с той же структурой, что и источник на цели, если они отсутствуют. На этом этапе все таблицы, указанные в фильтре, должны существовать в целевой базе данных Postgres.
Чтобы подключиться к контейнеру Docker postgres-target
и проверить, существуют ли таблицы, вы можете выполнить следующую команду:
docker exec -it postgres-target psql -U postgres -d postgres -c 'dt'
Выполнив приведенную выше команду, вы увидите список таблиц, созданных в базе данных postgres-target
, что подтвердит, что DBConvert Streams успешно создал все таблицы.
List of relations
Schema | Name | Type | Owner
--------+--------------+-------+----------
public | city | table | postgres
public | country | table | postgres
public | order_status | table | postgres
public | product | table | postgres
public | sale | table | postgres
public | status_name | table | postgres
public | store | table | postgres
public | users | table | postgres
(8 rows)
Заполните источник образцами данных.
Теперь, когда базы данных mysql-source
и postgres-target
имеют идентичные наборы таблиц с одинаковой структурой, пришло время выяснить, правильно ли работает потоковая передача данных. . Это можно сделать, вставив данные в базу данных mysql-source
и проверив, реплицируются ли те же данные в базу данных postgres-target
.
-- Set params
SET @number_of_sales = '100';
SET @number_of_users = '100';
SET @number_of_products = '100';
SET @number_of_stores = '100';
SET @number_of_countries = '100';
SET @number_of_cities = '30';
SET @status_names = '5';
SET @start_date = '2023-01-01 00:00:00';
SET @end_date = '2023-02-01 00:00:00';
USE source;
TRUNCATE TABLE city ;
TRUNCATE TABLE product ;
TRUNCATE TABLE country ;
TRUNCATE TABLE status_name;
TRUNCATE TABLE users;
TRUNCATE TABLE order_status;
TRUNCATE TABLE sale;
TRUNCATE TABLE store;
-- Filling of products
INSERT INTO product
WITH RECURSIVE t(id) AS (
SELECT 1
UNION ALL
SELECT id + 1
FROM t
WHERE id + 1 <= @number_of_products
)
SELECT id, CONCAT_WS(' ','Product', id)
FROM t;
-- Filling of countries
INSERT INTO country
WITH RECURSIVE t(id) AS (
SELECT 1
UNION ALL
SELECT id + 1
FROM t
WHERE id + 1 <= @number_of_countries
)
SELECT id, CONCAT('Country ', id)
FROM t;
-- Filling of cities
INSERT INTO city
WITH RECURSIVE t(id) AS (
SELECT 1
UNION ALL
SELECT id + 1
FROM t
WHERE id + 1 <= @number_of_cities
)
SELECT id
, CONCAT('City ', id)
, FLOOR(RAND() * (@number_of_countries + 1))
FROM t;
-- Filling of stores
INSERT INTO store
WITH RECURSIVE t(id) AS (
SELECT 1
UNION ALL
SELECT id + 1
FROM t
WHERE id + 1 <= @number_of_stores
)
SELECT id
, CONCAT('Store ', id)
, FLOOR(RAND() * (@number_of_cities + 1))
FROM t;
-- Filling of users
INSERT INTO users
WITH RECURSIVE t(id) AS (
SELECT 1
UNION ALL
SELECT id + 1
FROM t
WHERE id + 1 <= @number_of_users
)
SELECT id
, CONCAT('User ', id)
FROM t;
-- Filling of status_names
INSERT INTO status_name
WITH RECURSIVE t(status_name_id) AS (
SELECT 1
UNION ALL
SELECT status_name_id + 1
FROM t
WHERE status_name_id + 1 <= @status_names
)
SELECT status_name_id
, CONCAT('Status Name ', status_name_id)
FROM t;
-- Filling of sales
INSERT INTO sale
WITH RECURSIVE t(sale_id) AS (
SELECT 1
UNION ALL
SELECT sale_id + 1
FROM t
WHERE sale_id + 1 <= @number_of_sales
)
SELECT UUID() AS sale_id
, ROUND(RAND() * 10, 3) AS amount
, DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS date_sale
, FLOOR(RAND() * (@number_of_products + 1)) AS product_id
, FLOOR(RAND() * (@number_of_users + 1)) AS user_id
, FLOOR(RAND() * (@number_of_stores + 1)) AS store_id
FROM t;
-- Filling of order_status
INSERT INTO order_status
WITH RECURSIVE t(order_status_id) AS (
SELECT 1
UNION ALL
SELECT order_status_id + 1
FROM t
WHERE order_status_id + 1 <= @number_of_sales
)
SELECT UUID() AS order_status_id
, DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS update_at
, FLOOR(RAND() * (@number_of_sales + 1)) AS sale_id
, FLOOR(RAND() * (@status_names + 1)) AS status_name_id
FROM t;
скрипт fill_tables.sql
Этот вышеприведенный сценарий начинается с усечения всех таблиц в базе данных source
, чтобы очистить все предыдущие данные, а затем вставляет образцы данных в таблицы. Каждая таблица заполняется демонстрационными данными с использованием различных наборов параметров, заданных в верхней части скрипта.
Чтобы выполнить этот сценарий SQL, вы можете запустить следующую команду:
docker exec -i
mysql-source
mysql -uroot -p123456 -D source < $PWD/fill_tables.sql
Сравнение количества записей в исходной и целевой базах данных.
Сравним количество строк в таблицах исходной и целевой баз данных.
docker exec -it mysql-source mysql -uroot -p123456 -D source -e "SELECT (SELECT COUNT(*) FROM product) as 'product_count',(SELECT COUNT(*) FROM country) as 'country_count',(SELECT COUNT(*) FROM city) as 'city_count',(SELECT COUNT(*) FROM store) as 'store_count',(SELECT COUNT(*) FROM users) as 'users_count',(SELECT COUNT(*) FROM status_name) as 'status_name_count',(SELECT COUNT(*) FROM sale) as 'sale_count',(SELECT COUNT(*) FROM order_status) as 'order_status_count';"
Сколько записей в каждой таблице базы данных mysql-source?
docker exec -it postgres-target psql -U postgres -d postgres -c "SELECT (SELECT COUNT(*) FROM product) as product_count,(SELECT COUNT(*) FROM country) as country_count,(SELECT COUNT(*) FROM city) as city_count,(SELECT COUNT(*) FROM store) as store_count,(SELECT COUNT(*) FROM users) as users_count,(SELECT COUNT(*) FROM status_name) as status_name_count,(SELECT COUNT(*) FROM sale) as sale_count,(SELECT COUNT(*) FROM order_status) as order_status_count;"
сколько записей в каждой таблице postgres-target db?
<цитата>
Как видно из итогового вывода, все таблицы как в исходной, так и в целевой базах данных имеют одинаковое количество записей в каждой таблице.
Проверить статистику.
Следующая команда отправляет запрос GET в конечную точку API DBConvert /api/v1/streams/stat
, которая извлекает статистику текущего потока данных. Команда jq
используется для форматирования вывода JSON для лучшей читабельности.
docker run -t --rm
--network sales-db_default
curlimages/curl
--request GET
--url http://dbs-api:8020/api/v1/streams/stat | jq
{
"streamID": "2KHTjpsZAUCb8y1BZny3YKoX5qO",
"source": {
"counter": 635,
"elapsed": "0s",
"started": "2023-01-13T17:24:50.089257721Z",
"status": "RUNNING"
},
"target": {
"counter": 635,
"elapsed": "0s",
"started": "2023-01-13T17:24:50.089649312Z",
"status": "RUNNING"
}
}
Приведенный выше вывод показывает статистику текущего потока данных. Поле «источник» показывает статистику исходной базы данных, а поле «цель» показывает статистику целевой базы данных.
Поле «счетчик» показывает количество событий, которые были обработаны потоком, поле «истекшее» показывает время, прошедшее с момента запуска потока, поле «начало» показывает дату и время запуска потока, а « поле состояния» показывает статус текущего потока.
Метрики Прометея.
Prometheus – это система мониторинга, которая собирает данные показателей из различных источников и сохраняет их в базе данных временных рядов. DBConvert Streams собирает свои внутренние показатели в формате Prometheus, что позволяет вам просматривать и визуализировать данные в режиме реального времени на информационных панелях. Чтобы просмотреть собранные показатели, посетите http://127.0.0.1:9090 в веб-браузере, чтобы получить доступ к пользовательскому интерфейсу Prometheus.
Заключение
Системы Change Data Capture (CDC), такие как DBConvert Streams, можно использовать для потоковой передачи данных из базы данных MySQL в базу данных PostgreSQL в режиме реального времени, что позволяет синхронизировать две системы и получать преимущества. уникальных особенностей и возможностей каждой базы данных.
В этом руководстве представлена информация о потоковой передаче данных в одном направлении, из MySQL Binlog в PostgreSQL. Репозиторий DBConvert Streams на Github содержит дополнительные примеры настройки потоков данных, в том числе из PostgreSQL Wals в MySQL и другие конфигурации. Эти примеры могут послужить отправной точкой для настройки собственного потока данных с учетом ваших конкретных потребностей и вариантов использования.
Получение ваших отзывов о DBConvert Streams очень важно для команды разработчиков, чтобы улучшить программное обеспечение и сделать его более полезным для сообщества. Делясь своими идеями, сообщая об ошибках и запрашивая новые функции, вы можете активно участвовать в разработке программного обеспечения и помогать делать его более надежным и полезным для всех. Ваше участие высоко ценится командой разработчиков и сообществом.
Также опубликовано здесь
Оригинал