我的sql定义为
CREATE TABLE IF NOT EXISTS TABLE_1 (
headers VARCHAR NOT NULL,
id VARCHAR NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NULL,
type VARCHAR NOT NULL,
contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_1__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_1__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE IF NOT EXISTS TABLE_2 (
headers VARCHAR NOT NULL,
id VARCHAR NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NULL,
type VARCHAR NOT NULL,
contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_2__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_2__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
INSERT INTO FINAL_TABLE
select t1.headers, t2.contentJson
FROM TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)
这里
TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)
join 是存储在状态中的。我用streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true));
。我可以看到检查点正在收集数据。我已将检查点指向 /tmp
文件夹。我看到与此 jon 相关的 _metadata 和 shared
文件夹
我想查询这个连接产生的状态,有这样的例子吗?
没有任何工具可以以合理的方式做到这一点。
如果深入研究 SQL 连接运算符的源代码,您可以找到相关状态对象使用的名称和序列化器。有了这些信息,您就可以使用状态处理器 API 从保存点读取此状态。
我不能保证这会起作用;我不知道有人尝试过使用 SQL 运算符的状态。