使用flink sql join 2 source时如何读取rocksdb状态

问题描述 投票:0回答:1

我的sql定义为

CREATE TABLE IF NOT EXISTS TABLE_1 (
    headers     VARCHAR NOT NULL,
    id          VARCHAR NOT NULL,
    `timestamp` TIMESTAMP_LTZ(3) NULL,
    type        VARCHAR NOT NULL,
    contentJson VARCHAR NOT NULL
) WITH (
    'connector' = 'kafka',
    'topic-pattern' = 'table_1__.+?',
    'properties.bootstrap.servers' = 'localhost:29092',
    'properties.group.id' = 'table_1__raw_step1_local_1',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);


CREATE TABLE IF NOT EXISTS TABLE_2 (
    headers     VARCHAR NOT NULL,
    id          VARCHAR NOT NULL,
    `timestamp` TIMESTAMP_LTZ(3) NULL,
    type        VARCHAR NOT NULL,
    contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_2__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_2__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

INSERT INTO FINAL_TABLE
select t1.headers, t2.contentJson
FROM TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)

这里

TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)
join 是存储在状态中的。我用
streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true));
。我可以看到检查点正在收集数据。我已将检查点指向
/tmp
文件夹。我看到与此 jon 相关的 _metadata 和
shared
文件夹

我想查询这个连接产生的状态,有这样的例子吗?

apache-flink flink-streaming flink-sql
1个回答
0
投票

没有任何工具可以以合理的方式做到这一点。

如果深入研究 SQL 连接运算符的源代码,您可以找到相关状态对象使用的名称和序列化器。有了这些信息,您就可以使用状态处理器 API 从保存点读取此状态。

我不能保证这会起作用;我不知道有人尝试过使用 SQL 运算符的状态。

© www.soinside.com 2019 - 2024. All rights reserved.