如何使用flink sql进行基于内容的重复数据删除

问题描述 投票:0回答:1

我的flink sql语句如下

CREATE OR REPLACE TABLE table_one /** mode('streaming')*/
(
        `pk` string,
        `id` string,
        `segments` ARRAY<STRING>,
        `headers` MAP<STRING, BYTES> METADATA,
        `kafka_key` STRING,
        `ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
         WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
        `hard_deleted` boolean
)WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'kafka:29092',
        'properties.group.id' = 'grp1',
        'topic-pattern' = 'topic_one',
        'value.format' = 'json',
        'format' = 'json',
        'key.format' = 'raw',
        'key.fields' = 'kafka_key',
        'value.fields-include' = 'EXCEPT_KEY',
        'scan.startup.mode' = 'earliest-offset',
        'json.timestamp-format.standard' = 'ISO-8601',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
);

create or replace view table_one_source  as (
SELECT cast(`headers`['pod'] as varchar)            as pod,
       cast(`headers`['org'] as varchar)            as org,
       cast(`headers`['tenantId'] as varchar)       as tenantId,
       kafka_key                                    as pk,
       COALESCE(id, SPLIT_INDEX(kafka_key, '#', 1)) as id,
       segments,
       ts
FROM table_one
WHERE `headers`['tenantId'] is not null
  AND `headers`['pod'] is not null
  AND `headers`['org'] is not null
);

Create or replace view table_one_source_keyed as (
    WITH table_one_source_hash AS (
        SELECT
            pod, org, tenantId, pk, id, segments, ts,
            HASH_CODE(tenantId || id || CAST(segments AS STRING)) AS data_hash
        FROM table_one_source
    ),
    entitlement_source_deduped as (
         SELECT *
         FROM (
             SELECT *,
                    LAG(data_hash) OVER (PARTITION BY tenantId, id ORDER BY ts) AS prev_data_hash
             FROM table_one_source_hash
             )
         WHERE data_hash IS DISTINCT FROM prev_data_hash OR prev_data_hash IS NULL
    )
    
select * from entitlement_source_deduped
);

这里的目标是我只想要新的或(这些值与以前不同)

id
segments
从table_one流向下游。上面的sql工作了。它会产生这样的 dag enter image description here

它使用

:OverAggregate(partitionBy=[$5, $6], orderBy=[ts ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[segments, kafka_key, ts, $3, $4, $5, $6, $7, LAG($7) AS w0$o0])
。 OverAggregate 的窗口似乎是无限的。也担心这个运营商的状态真的能变大。

问题:是否有其他方法可以根据消息内容进行重复数据删除。

apache-flink flink-sql
1个回答
0
投票

如果你不需要输出

prev_data_hash
那么你可以尝试flink去重SQL(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication /)。 它将创建带有 Top1Function 的 RankOperator,它仍然是无界的(如示例中的 OverAggregate),但可能会有点有效(特别是如果在源表上定义了主键)。

或者您可以配置状态 TTL 以限制无界状态的大小 - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/# Different-ways-配置状态 ttl

© www.soinside.com 2019 - 2024. All rights reserved.