如何将我的 PostgreSQL 数据 ETL 到 ClickHouse 数据仓库中?

问题描述 投票:0回答:4

我将数据存储在 postgreSQL 作为数据源,我想加载 clickhouse 数据仓库的维度和事实表,我是 clickhouse 的新手,习惯使用 Talend 和 Microsoft SSIS 等传统集成工具来执行 ETL (PS 我正在为 clickhouse 和 postgreSQL 使用 docker 镜像)

postgresql etl data-warehouse business-intelligence clickhouse
4个回答
3
投票

查看 PostgreSQL 引擎集成 here,您可以在 ClickHouse 上对远程 PostgreSQL 中存储的数据执行

SELECT
INSERT
查询。

您还可以使用表格功能


2
投票

您可以使用以下方法将数据从 Postgres 提取到 Clickhouse 中:

  • 外部 ETL 工具,例如 Airbyte,并创建从 Postgres 到 Clikhouse 的连接器

  • Clickhouse 集成表引擎 创建从 Clickhouse 到 Postgres 数据的视图,然后使用

    insert into
    查询将该视图中的数据插入到真正的 Clickhouse 表中


1
投票

这里是如何将 PostgreSQL 数据导入到相同形状的新 ClickHouse 表中的示例。假设您有一个名为

foo
的表,其中包含两列:
id
foo

  1. 使用 PostgreSQL 表引擎 创建一个表以查看您的 pg 表(用您自己的连接信息替换连接信息):
CREATE TABLE IF NOT EXISTS foo_pg
(
    id UUID,
    foo String
) ENGINE = PostgreSQL('host:port', 'database', 'tablename', 'username', 'password');
  1. 创建一个具有相同形状的新 ClickHouse 表(以避免在
    INSERT
    上进行类型转换):
CREATE TABLE IF NOT EXISTS foo
(
    id UUID,
    foo String
) ENGINE = MergeTree()
ORDER BY tuple()
PRIMARY KEY(id);
  1. 使用 INSERT INTO 导入数据:
INSERT INTO foo (id, foo)
SELECT id, foo FROM foo_pg;

0
投票

我们使用 Kafka 和 Debezium 通过 CDC(更改数据捕获)将数据从 PostgreSQL 流式传输到 ClickHouse。

  1. 配置 PostgreSQL 服务器设置和 Debezium 设置,如下所示:
  • 验证wal_level参数是否符合逻辑
  • 对于 PostgreSQL 版本 10 及更高版本,使用 pgoutput,因此不需要其他插件。
  • 根据说明,您必须创建一个具有最低必要权限(REPLICATION 和 LOGIN 权限)的 Debezium 用户,并且还需要其他权限才能使用 pgoutput。
  1. 连接到源数据库:

su - postgres 
psql -U debezium_user -d salary

  1. 创建表并将数据插入PostgreSQL:

CREATE TABLE IF NOT EXISTS employee
(
    id bigint NOT NULL,
    department character varying(255) NOT NULL,
    employee_number character varying(255),
    date_of_recruitment date,
    CONSTRAINT employee_pkey PRIMARY KEY (id)
);
INSERT INTO employee VALUES (1, 'full time', '776E', now());

  1. 配置 Debezium 容器:

echo '{
  "name": "shipments-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "postgres", 
    "database.server.name": "postgres", 
    "database.port": "5432", 
    "database.user": "debezium_user", 
    "database.password": "debezium_pw", 
    "database.dbname" : "salary", 
    "table.include.list": "public.employee",
    "topic.prefix": "salary",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",  
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "false",
    "decimal.handling.mode": "double"
    }
}' > debezium.json

  1. 初始化 Debezium:

curl -H 'Content-Type: application/json' debezium:8083/connectors --data "@debezium.json"

您应该看到 Kafka 主题中生成了一个名为 salal.public.employee 的主题

  1. 检查Kafka容器

/usr/bin/kafka-topics --list  --bootstrap-server kafka:9092

/usr/bin/kafka-console-consumer  --bootstrap-server kafka:9092  --topic salary.public.employee --from-beginning

  1. 检查ClickHouse数据库

CREATE TABLE IF NOT EXISTS default.kafka_table
(
    `id` UInt64,
    `department` String,
    `employee_number` String,
    `date_of_recruitment` Date
)
ENGINE = ReplacingMergeTree
ORDER BY id;

这样,您就设置了从 PostgreSQL 到 ClickHouse 的测试 CDC 管道。您可以在this PostgreSQL > ClickHouse Streaming Guide看到更详细的解释。

© www.soinside.com 2019 - 2024. All rights reserved.