我的flink sql语句如下
CREATE OR REPLACE TABLE table_one /** mode('streaming')*/
(
`pk` string,
`id` string,
`segments` ARRAY<STRING>,
`headers` MAP<STRING, BYTES> METADATA,
`kafka_key` STRING,
`ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
`hard_deleted` boolean
)WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'grp1',
'topic-pattern' = 'topic_one',
'value.format' = 'json',
'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'kafka_key',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
create or replace view table_one_source as (
SELECT cast(`headers`['pod'] as varchar) as pod,
cast(`headers`['org'] as varchar) as org,
cast(`headers`['tenantId'] as varchar) as tenantId,
kafka_key as pk,
COALESCE(id, SPLIT_INDEX(kafka_key, '#', 1)) as id,
segments,
ts
FROM table_one
WHERE `headers`['tenantId'] is not null
AND `headers`['pod'] is not null
AND `headers`['org'] is not null
);
Create or replace view table_one_source_keyed as (
WITH table_one_source_hash AS (
SELECT
pod, org, tenantId, pk, id, segments, ts,
HASH_CODE(tenantId || id || CAST(segments AS STRING)) AS data_hash
FROM table_one_source
),
entitlement_source_deduped as (
SELECT *
FROM (
SELECT *,
LAG(data_hash) OVER (PARTITION BY tenantId, id ORDER BY ts) AS prev_data_hash
FROM table_one_source_hash
)
WHERE data_hash IS DISTINCT FROM prev_data_hash OR prev_data_hash IS NULL
)
select * from entitlement_source_deduped
);
这里的目标是我只想要新的或(这些值与以前不同)
id
和segments
从table_one流向下游。上面的sql工作了。它会产生这样的 dag
它使用
:OverAggregate(partitionBy=[$5, $6], orderBy=[ts ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[segments, kafka_key, ts, $3, $4, $5, $6, $7, LAG($7) AS w0$o0])
。 OverAggregate 的窗口似乎是无限的。也担心这个运营商的状态真的能变大。
问题:是否有其他方法可以根据消息内容进行重复数据删除。
如果你不需要输出
prev_data_hash
那么你可以尝试flink去重SQL(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication /)。
它将创建带有 Top1Function 的 RankOperator,它仍然是无界的(如示例中的 OverAggregate),但可能会有点有效(特别是如果在源表上定义了主键)。
或者您可以配置状态 TTL 以限制无界状态的大小 - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/# Different-ways-配置状态 ttl