Синхронизируйте данные MySQL с S3 всего за 3 шага, используя Apache Seatunnel

Синхронизируйте данные MySQL с S3 всего за 3 шага, используя Apache Seatunnel

25 июля 2025 г.

Как вы можете эффективно синхронизировать данные MySQL в S3file с Apache Seatunnel? Все шаги, подробно описанные здесь, были полностью протестированы и подходят для создания сценария платформы данных на основе хранения объектов. Они предлагают гибкое развертывание и сильную масштабируемость, что делает это руководство очень ценным для пользователей с потребностями в интеграции MySQL -S3. Дайте ему, сохраните, и начните учиться!

Шаг 1: Создайте таблицу улей

CREATE EXTERNAL TABLE ods_ekp.`ods_sys_notify_todo_bak` (
  `fd_id` STRING,
  `fd_app_name` STRING,
  `fd_model_name` STRING,
  `fd_model_id` STRING,
  `fd_key` STRING,
  `fd_parameter1` STRING,
  `fd_parameter2` STRING,
  `fd_create_time` TIMESTAMP,
  `fd_subject` STRING,
  `fd_type` INT,
  `fd_link` STRING,
  `fd_mobile_link` STRING,
  `fd_pad_link` STRING,
  `fd_bundle` STRING,
  `fd_replace_text` STRING,
  `fd_md5` STRING,
  `fd_del_flag` STRING,
  `fd_level` INT,
  `doc_creator_id` STRING,
  `fd_extend_content` STRING,
  `fd_lang` STRING,
  `fd_cate_name` STRING,
  `fd_cate_id` STRING,
  `fd_template_name` STRING,
  `fd_template_id` STRING,
  `fd_hierarchy_id` STRING
)
COMMENT 'sys_notify_todo_bak data'
PARTITIONED BY (
  `dctime` STRING COMMENT 'partition year‑month‑day'
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
STORED AS PARQUET
LOCATION 's3a://seatunnel/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak'
TBLPROPERTIES (
  'parquet.compression'='ZSTD'
);

Примечания:

  1. Настройка разделителяROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'должен быть настроен идентично позже в Seatunnel; В противном случае формат будет неправильным.
  2. Алгоритм сжатия'parquet.compression'='ZSTD'Также нужна та же конфигурация позже в Seatunnel.
  3. Формат файлаSTORED AS PARQUETАналогично должны соответствовать более поздней конфигурации Seatunnel.

Перед использованием удалите комментарии

env {
  job.mode = "BATCH"
  parallelism = 2
}

source {
    Jdbc {
        url = "jdbc:mysql://[server‑ip]:3306/[database]?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "[username]"
        password = "[password]",
        # dctime must be converted to string, because in the Hive table this field is a string; include the partition field in the query—SeaTunnel will handle it automatically in the sink
        query = "select fd_id, fd_app_name, fd_model_name, fd_model_id, fd_key, fd_parameter1, fd_parameter2, fd_create_time, fd_subject, fd_type, fd_link, fd_mobile_link, fd_pad_link, fd_bundle, fd_replace_text, fd_md5, fd_del_flag, fd_level, doc_creator_id, fd_extend_content, fd_lang, fd_cate_name, fd_cate_id, fd_template_name, fd_template_id, fd_hierarchy_id, cast(date_format(fd_create_time, '%Y-%m-%d') as char) as dctime from sys_notify_todo_bak"
    }
}

transform {
}

sink {
    S3File {
      bucket = "s3a://seatunnel"
      fs.s3a.endpoint = "[minio‑host/ip]:9000"
      access_key = "[username]"
      secret_key = "[password]"
      fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
      # directory path
      path = "/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak"
      tmp_path = "/data/tmp/seatunnel"
      # Mandatory; because my MinIO is not SSL‑enabled, set as follows
      hadoop_s3_properties {
        "fs.s3a.connection.ssl.enabled" = "false"
        "fs.s3a.path.style.access" = "true"
      }
      # Parquet file format
      file_format_type = "parquet"
      # Must use \\ to represent \
      field_delimiter = "\\001"
      # Required for Parquet or it will fail
      parquet_avro_write_timestamp_as_int96 = true
      # Compression algorithm
      compress_codec = "zstd"
      have_partition = true
      partition_by = ["dctime"]
      partition_dir_expression = "${k0}=${v0}"
      is_partition_field_write_in_file = false
      schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
      data_save_mode = "APPEND_DATA"
      custom_filename = true
      file_name_expression = "${transactionId}_${now}"
      filename_time_format = "yyyy.MM.dd"
  }
}

Шаг 2: запустите синхронизацию Seatunnel и выполните в Hive

-- Refresh the physical directory mapping
MSCK REPAIR TABLE ods_ekp.ods_sys_notify_todo_bak;

-- Query the Hive table to confirm data
SELECT * FROM ods_ekp.ods_sys_notify_todo_bak LIMIT 100;

Шаг 3: Создайте внешнюю базу данных каталога Doris Hive

CREATE CATALOG hive PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://[hive‑metastore‑ip]:9083',
    "s3.endpoint" = "http://[minio‑host/ip]:9000",
    "s3.region" = "us-east-1",
    "s3.access_key" = "[username]",
    "s3.secret_key" = "[password]",
    "s3.connection.ssl.enabled" = "false",
    "use_path_style" = "true",
    "hive.version" = '2.1.1'
);

REFRESH CATALOG hive;

SHOW DATABASES FROM hive;

SELECT * FROM hive.ods_ekp.ods_sys_notify_todo_bak LIMIT 100;

Объяснение:

  1. Потому что я использую CDH6.3.2 и Hive2.1.1, вам нужно указать"hive.version" = '2.1.1'При создании каталога.
  2. Поскольку мой Minio не SSL -"s3.connection.ssl.enabled" = "false"Полем
  3. Minio использует адресацию в стиле пути, так что установите"use_path_style" = "true"Полем
  4. Seatunnel Версия: 2.3.11
  5. Дорис Версия: 2.0.15


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE