Как передавать данные из MySQL в Postgres

Как передавать данные из MySQL в Postgres

29 января 2023 г.

Потоковая передача данных из базы данных MySQL в базу данных PostgreSQL может быть полезным способом перемещения данных между системами или создания реплики базы данных в реальном времени для составления отчетов и анализа. Один из способов добиться этого — использовать Инструменты Change Data Capture (CDC).

CDC — это метод отслеживания изменений, внесенных в базу данных, и записи их в отдельный поток. Затем этот поток можно использовать для репликации изменений в другую базу данных.

DBConvert Streams помогает реплицировать ваши данные MySQL в PostgreSQL в режиме реального времени. Он фиксирует изменения данных из исходной базы данных MySQL и применяет их к целевой базе данных PostgreSQL. Это можно сделать, настроив источник для чтения двоичного журнала базы данных MySQL и преобразования изменений в формат, который может быть использован целевой базой данных PostgreSQL.

[Примеры и конфигурации потоковой передачи базы данных, репозиторий GitHub](https://github.com/slotix/dbconvert-streams-public)

Для начала давайте клонируем репозиторий GitHub, содержащий пример потоковой передачи MySQL в PostgreSQL.

git clone git@github.com:slotix/dbconvert-streams-public.git && cd dbconvert-streams-public/examples/mysql2postgres/sales-db/

Конфигурация Docker Compose.

Поскольку DBConvert Streams использует несколько служб, наиболее эффективным способом запуска контейнеров является использование Docker Compose. р>

Файл docker-compose.yml из репозитория приведен ниже.

version: '3.9'
services:
  dbs-api:
    container_name: api
    image: slotix/dbs-api
    entrypoint:
      - ./dbs-api
      - --nats=nats:4222
      - --source=source-reader:8021
      - --target=target-writer:8022
    ports:
      - 8020:8020
    depends_on:
      - nats
    volumes:
      - ./mysql2pg.json:/mysql2pg.json:ro

  dbs-source-reader:
    container_name: source-reader
    image: slotix/dbs-source-reader
    entrypoint:
      - ./dbs-source-reader
      - --nats=nats:4222
    ports:
      - 8021:8021
    depends_on:
      - dbs-api

  dbs-target-writer:
    container_name: target-writer
    image: slotix/dbs-target-writer
    entrypoint:
      - ./dbs-target-writer
      - --nats=nats:4222
      - --prometheus=http://prometheus:9090
    ports:
      - 8022:8022
    depends_on:
      - dbs-source-reader

  nats:
    container_name: nats
    image: nats
    entrypoint: /nats-server
    command: "--jetstream -m 8222 --store_dir /data/nats-server"
    ports:
      - 4222:4222
      - 8222:8222

  prometheus:
    image: slotix/dbs-prometheus:latest
    container_name: prom
    user: root
    ports:
      - 9090:9090

  mysql-source:
    container_name: mysql-source
    build: ./source 
    environment:
      - MYSQL_ROOT_PASSWORD=123456
    ports:
      - '3306:3306'


  postgres-target:
    container_name: postgres-target
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    command: postgres
    ports:
      - '5432:5432'

Файл Docker-compose для потоков DBConvert

Этот файл docker-compose будет вызывать несколько сервисов и связывать их вместе, чтобы они могли взаимодействовать друг с другом. При такой настройке процесс репликации происходит между базами данных mysql-source и postgres-target и контролируется dbs-api, < code>dbs-source-reader, dbs-target-writer и службы nats.

Сервисы DBConvert Streams.

    Служба
  • dbs-api является точкой входа в DBConvert Streams. Сюда отправляются запросы с параметрами конфигурации для исходной и целевой баз данных. В нем указаны сведения о подключении к другим компонентам системы, таким как source-reader и target-write.
  • Служба
  • dbs-source-reader отвечает за мониторинг и регистрацию изменений в исходной базе данных, а затем за отправку пакетов записей в концентратор событий.
  • Служба
  • dbs-target-writer используется для получения изменений из концентратора событий и их применения к целевой базе данных.
  • Служба
  • nats является ядром концентратора событий и обеспечивает связь между другими службами DBS.
  • Служба
  • prometheus используется для мониторинга метрик служб DBS.

Службы базы данных.

Структура таблиц, которые будут использоваться в нашем примере, показана на диаграмме ниже.

ER Diagram of DB

Структура исходных таблиц MySQL взята из jdaarevalo/docker_postgres_with_data репозитория GitHub.

Исходная и целевая базы данных.

Образ базы данных

mysql-source основан на slotix/dbs-mysql:8, который включает в себя все необходимые настройки для включения репликации MySQL CDC. Этот образ также содержит initdb.sql script, который создает таблицы со структурами, показанными выше.

База данных postgres-target, с другой стороны, основана на официальном облегченном образе postgres:15-alpine. База данных postgres-target будет получать все изменения, внесенные в базу данных mysql-source.

<цитата>

Обе эти базы данных, mysql-source и postgres-target, обычно располагаются на отдельных физических серверах в производственной среде. Однако в этом примере мы запустим их на одном компьютере в разных контейнерах в демонстрационных целях.

Выполнение.

Чтобы запустить все службы, описанные выше, выполните следующую команду:

docker-compose up --build -d

Эта команда будет использовать файл docker-compose.yml для сборки и запуска необходимых контейнеров в автономном режиме (опция -d). Флаг --build инициирует перестроение образов перед запуском контейнеров.


Чтобы проверить, есть ли в базе данных MySQL внутри работающего контейнера все таблицы, успешно созданные сценарием при запуске, вы можете выполнить команду:

docker exec -it mysql-source mysql -uroot -p123456 source -e 'SHOW TABLES;'

Выполнив приведенную выше команду, вы увидите список таблиц, созданных в базе данных source внутри контейнера, что подтвердит, успешно ли скрипт создал все таблицы при запуске.

+------------------+
| Tables_in_source |
+------------------+
| city             |
| country          |
| order_status     |
| product          |
| sale             |
| status_name      |
| store            |
| users            |
+------------------+

Список таблиц, созданных в исходном коде

Конфигурация трансляции

Это файл конфигурации потока mysql2pg.json, который используется для настройки процесса репликации базы данных.

{
  "source": {
    "type": "mysql",
    "connection": "root:123456@tcp(mysql-source:3306)/source",
    "filter": {
      "tables": [
        { "name": "product", "operations": ["insert"]},
        { "name": "country", "operations": ["insert"]},
        { "name": "city", "operations": ["insert"]},
        { "name": "store", "operations": ["insert"]},
        { "name": "users", "operations": ["insert"]},
        { "name": "status_name", "operations": ["insert"]},
        { "name": "sale", "operations": ["insert"]},
        { "name": "order_status", "operations": ["insert"]}
      ]
    }
  },
  "target": {
    "type": "postgresql",
    "connection": "postgres://postgres:postgres@postgres-target:5432/postgres"
  },
  "limits": {
    "numberOfEvents": 0,
    "elapsedTime": 0
  }
}

* Поле source указывает тип исходной базы данных как "mysql" и сведения о соединении для подключения к базе данных, включая имя пользователя и пароль, а также хост и порт. . * Поле filter в поле источника указывает, что будут реплицированы только определенные таблицы и соответствующие им операции (в данном случае "insert"). * Поле target указывает тип целевой базы данных как "postgresql" и сведения о соединении для подключения к целевой базе данных, включая имя пользователя и пароль, а также хост и порт. * Поле limits указывает, что количество событий и прошедшее время равны нулю (0), что означает отсутствие ограничений для этого процесса репликации.

Также стоит отметить, что в соответствии с этим конфигом будут фиксироваться только операции вставки в таблицы, указанные в поле фильтра.

Отправить конфигурацию в DBConvert Streams API.

docker run -t --rm 
    --network sales-db_default 
    curlimages/curl  
    --request POST 
    --url http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json

Эта команда запускает команду docker run для запуска нового контейнера из образа curlimages/curl. Он указывает, что контейнер должен присоединиться к сети с именем sales-db_default с помощью параметра --network.

Затем этот контейнер выполнит команду curl, чтобы сделать HTTP-запрос POST к URL-адресу http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json. . URL-адрес содержит конечную точку, которая является службой dbs-api, которая работает на порту 8020 и ожидает файл json в качестве параметра запроса. Эта команда создает новый поток в службе dbs-api с конфигурацией, указанной в файле mysql2pg.json.

Важно отметить, что эта команда предполагает, что сеть sales-db_default и служба dbs-api уже созданы и запущены. Также предполагается, что файл mysql2pg.json находится в текущем рабочем каталоге, из которого запускается команда.

Это ответ JSON, указывающий на успешное создание потока.

{"status":"success",
    "data":{
        "id":"2KGlt8BCHLT0lXklrs5wqM6n7BQ",
        "source":{...},
        "target":{...},
        "limits":{}
     }
}

Он содержит следующие поля:

* статус: в этом поле указывается статус запроса, в данном случае "успешно" * data: это поле содержит сведения о созданном потоке. * id: это поле содержит уникальный идентификатор потока, в данном случае «2KGlt8BCHLT0lXklrs5wqM6n7BQ». * source: это поле содержит сведения об исходной базе данных, включая тип, сведения о соединении и параметры фильтра. * target: это поле содержит сведения о целевой базе данных, включая сведения о типе и соединении. * limits: это поле содержит ограничения для процесса репликации, такие как количество событий и истекшее время.

Обратите внимание, что сведения об исходном и целевом полях не приводятся здесь для краткости, они просто показаны как ...

Проверьте, успешно ли созданы таблицы в целевом объекте.

<цитата>

💡DBConvert Streams создает таблицы с той же структурой, что и источник на цели, если они отсутствуют. На этом этапе все таблицы, указанные в фильтре, должны существовать в целевой базе данных Postgres.

Чтобы подключиться к контейнеру Docker postgres-target и проверить, существуют ли таблицы, вы можете выполнить следующую команду:

 docker exec -it postgres-target psql -U postgres -d postgres -c 'dt'

Выполнив приведенную выше команду, вы увидите список таблиц, созданных в базе данных postgres-target, что подтвердит, что DBConvert Streams успешно создал все таблицы.

            List of relations
 Schema |     Name     | Type  |  Owner
--------+--------------+-------+----------
 public | city         | table | postgres
 public | country      | table | postgres
 public | order_status | table | postgres
 public | product      | table | postgres
 public | sale         | table | postgres
 public | status_name  | table | postgres
 public | store        | table | postgres
 public | users        | table | postgres
(8 rows)

Заполните источник образцами данных.

Теперь, когда базы данных mysql-source и postgres-target имеют идентичные наборы таблиц с одинаковой структурой, пришло время выяснить, правильно ли работает потоковая передача данных. . Это можно сделать, вставив данные в базу данных mysql-source и проверив, реплицируются ли те же данные в базу данных postgres-target.

-- Set params
SET @number_of_sales = '100';
SET @number_of_users = '100';
SET @number_of_products = '100';
SET @number_of_stores = '100';
SET @number_of_countries = '100';
SET @number_of_cities = '30';
SET @status_names = '5';
SET @start_date = '2023-01-01 00:00:00';
SET @end_date = '2023-02-01 00:00:00';

USE source;

TRUNCATE TABLE city ;
TRUNCATE TABLE product ;
TRUNCATE TABLE country ;
TRUNCATE TABLE status_name;
TRUNCATE TABLE users;
TRUNCATE TABLE order_status;
TRUNCATE TABLE sale;
TRUNCATE TABLE store;

-- Filling of products
INSERT INTO product
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_products
)
SELECT id, CONCAT_WS(' ','Product', id)
FROM t;


-- Filling of countries
INSERT INTO country
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_countries
)
SELECT id, CONCAT('Country ', id)
FROM t;


-- Filling of cities
INSERT INTO city
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_cities
)
SELECT id
    , CONCAT('City ', id)
    , FLOOR(RAND() * (@number_of_countries + 1))
FROM t;


-- Filling of stores
INSERT INTO store
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_stores
)
SELECT id
    , CONCAT('Store ', id)
    , FLOOR(RAND() * (@number_of_cities + 1))
FROM t;

-- Filling of users
INSERT INTO users
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_users
)
SELECT id
    , CONCAT('User ', id)
FROM t;

-- Filling of status_names
INSERT INTO status_name
WITH RECURSIVE t(status_name_id) AS (
    SELECT 1
    UNION ALL
    SELECT status_name_id + 1
    FROM t
    WHERE status_name_id + 1 <= @status_names
)
SELECT status_name_id
    , CONCAT('Status Name ', status_name_id)
FROM t;

-- Filling of sales  
INSERT INTO sale
WITH RECURSIVE t(sale_id) AS (
    SELECT 1
    UNION ALL
    SELECT sale_id + 1
    FROM t
    WHERE sale_id + 1 <= @number_of_sales
)
SELECT UUID() AS sale_id
    , ROUND(RAND() * 10, 3) AS amount
    , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS date_sale
    , FLOOR(RAND() * (@number_of_products + 1)) AS product_id
    , FLOOR(RAND() * (@number_of_users + 1)) AS user_id
    , FLOOR(RAND() * (@number_of_stores + 1)) AS store_id
FROM  t;

-- Filling of order_status
INSERT INTO order_status
WITH RECURSIVE t(order_status_id) AS (
    SELECT 1
    UNION ALL
    SELECT order_status_id + 1
    FROM t
    WHERE order_status_id + 1 <= @number_of_sales
)
SELECT UUID() AS order_status_id
    , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS update_at
    , FLOOR(RAND() * (@number_of_sales + 1)) AS sale_id
    , FLOOR(RAND() * (@status_names + 1)) AS status_name_id
FROM t;

скрипт fill_tables.sql

Этот вышеприведенный сценарий начинается с усечения всех таблиц в базе данных source, чтобы очистить все предыдущие данные, а затем вставляет образцы данных в таблицы. Каждая таблица заполняется демонстрационными данными с использованием различных наборов параметров, заданных в верхней части скрипта.

Чтобы выполнить этот сценарий SQL, вы можете запустить следующую команду:

docker exec -i 
    mysql-source 
    mysql -uroot -p123456 -D source < $PWD/fill_tables.sql

Сравнение количества записей в исходной и целевой базах данных.

Сравним количество строк в таблицах исходной и целевой баз данных.

docker exec -it mysql-source mysql -uroot -p123456 -D source -e "SELECT (SELECT COUNT(*) FROM product) as 'product_count',(SELECT COUNT(*) FROM country) as 'country_count',(SELECT COUNT(*) FROM city) as 'city_count',(SELECT COUNT(*) FROM store) as 'store_count',(SELECT COUNT(*) FROM users) as 'users_count',(SELECT COUNT(*) FROM status_name) as 'status_name_count',(SELECT COUNT(*) FROM sale) as 'sale_count',(SELECT COUNT(*) FROM order_status) as 'order_status_count';"

Сколько записей в каждой таблице базы данных mysql-source?

COUNTs for mysql-source tables.

docker exec -it postgres-target psql -U postgres -d postgres -c "SELECT (SELECT COUNT(*) FROM product) as product_count,(SELECT COUNT(*) FROM country) as country_count,(SELECT COUNT(*) FROM city) as city_count,(SELECT COUNT(*) FROM store) as store_count,(SELECT COUNT(*) FROM users) as users_count,(SELECT COUNT(*) FROM status_name) as status_name_count,(SELECT COUNT(*) FROM sale) as sale_count,(SELECT COUNT(*) FROM order_status) as order_status_count;"

сколько записей в каждой таблице postgres-target db?

COUNTs for postgres-target tables.

<цитата>

Как видно из итогового вывода, все таблицы как в исходной, так и в целевой базах данных имеют одинаковое количество записей в каждой таблице.

Проверить статистику.

Следующая команда отправляет запрос GET в конечную точку API DBConvert /api/v1/streams/stat, которая извлекает статистику текущего потока данных. Команда jq используется для форматирования вывода JSON для лучшей читабельности.

docker run -t --rm 
    --network sales-db_default 
    curlimages/curl  
    --request GET 
    --url http://dbs-api:8020/api/v1/streams/stat | jq

{
  "streamID": "2KHTjpsZAUCb8y1BZny3YKoX5qO",
  "source": {
    "counter": 635,
    "elapsed": "0s",
    "started": "2023-01-13T17:24:50.089257721Z",
    "status": "RUNNING"
  },
  "target": {
    "counter": 635,
    "elapsed": "0s",
    "started": "2023-01-13T17:24:50.089649312Z",
    "status": "RUNNING"
  }
}

Приведенный выше вывод показывает статистику текущего потока данных. Поле «источник» показывает статистику исходной базы данных, а поле «цель» показывает статистику целевой базы данных.

Поле «счетчик» показывает количество событий, которые были обработаны потоком, поле «истекшее» показывает время, прошедшее с момента запуска потока, поле «начало» показывает дату и время запуска потока, а « поле состояния» показывает статус текущего потока.

Метрики Прометея.

Prometheus – это система мониторинга, которая собирает данные показателей из различных источников и сохраняет их в базе данных временных рядов. DBConvert Streams собирает свои внутренние показатели в формате Prometheus, что позволяет вам просматривать и визуализировать данные в режиме реального времени на информационных панелях. Чтобы просмотреть собранные показатели, посетите http://127.0.0.1:9090 в веб-браузере, чтобы получить доступ к пользовательскому интерфейсу Prometheus.

Заключение

Системы Change Data Capture (CDC), такие как DBConvert Streams, можно использовать для потоковой передачи данных из базы данных MySQL в базу данных PostgreSQL в режиме реального времени, что позволяет синхронизировать две системы и получать преимущества. уникальных особенностей и возможностей каждой базы данных.

В этом руководстве представлена ​​информация о потоковой передаче данных в одном направлении, из MySQL Binlog в PostgreSQL. Репозиторий DBConvert Streams на Github содержит дополнительные примеры настройки потоков данных, в том числе из PostgreSQL Wals в MySQL и другие конфигурации. Эти примеры могут послужить отправной точкой для настройки собственного потока данных с учетом ваших конкретных потребностей и вариантов использования.

Получение ваших отзывов о DBConvert Streams очень важно для команды разработчиков, чтобы улучшить программное обеспечение и сделать его более полезным для сообщества. Делясь своими идеями, сообщая об ошибках и запрашивая новые функции, вы можете активно участвовать в разработке программного обеспечения и помогать делать его более надежным и полезным для всех. Ваше участие высоко ценится командой разработчиков и сообществом.


Также опубликовано здесь


Оригинал