即使所有分区中都没有新数据,如何在 Flink SQL 中为 Kafka 源提前水印?

问题描述 投票:0回答:1

我有一个简单的 Flink (v1.17) SQL 流作业,使用 Kafka 作为源,我配置了一些与水印相关的配置,但我似乎无法理解如何强制水印前进,即使没有活动即将到来。

最终目标是计算某个字段(金额)的Last 1h SUM。以下是 Flink 作业(Java 中)的相关部分以及我正在运行的 SQL 语句:

public class StreamingJob {

// ... 
public static void main(String[] args) throws Exception {
    // ...
    List<String> sqlStatements = ...

    final EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    streamEnv.getConfig().setAutoWatermarkInterval(1000);
    streamEnv.getConfig().setLatencyTrackingInterval(10000);

    tableEnv.getConfig().getConfiguration().setString("pipeline.auto-watermark-interval", "5000ms");
    tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "5000ms");
    tableEnv.getConfig().getConfiguration().setString("table.exec.emit.allow-lateness", "1d");

    for (String statement : sqlStatements) {
        tableEnv.executeSql(statement);
    }

    streamEnv.execute(jobId);
}

CREATE TABLE trades_stream_by_hour (
    `entityId` STRING,
    `poolId` STRING,
    `amountUSD` FLOAT,
    `blockNumber` BIGINT,
    `timestamp` BIGINT,
    `rowtime` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
    WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'stream',
    'topic' = 'trades',
    'scan.startup.mode' = 'timestamp',
    'scan.startup.timestamp-millis' = '1705851977000', -- 2 hours ago in unix timestamp milliseconds
    'format' = 'debezium-json',
    'debezium-json.schema-include' = 'true',
    'debezium-json.map-null-key.mode' = 'DROP',
    'debezium-json.encode.decimal-as-plain-number' = 'true'
);

-- Supressed the "my_sink" CREATE TABLE (jdbc sink table) statement for brevity
-- Supressed the "pools" CREATE TABLE (jdbc lookup table) statement for brevity

CREATE VIEW last_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeTrades,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) as window_time
        FROM
            trades_stream_by_hour
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '1' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
    )
) WHERE rn = 1;

CREATE VIEW prev_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_time
        FROM
            trades_stream_by_hour
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '2' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
    )
) WHERE rn = 12;

INSERT INTO
    my_sink
SELECT
    COALESCE(lv1h.poolId, pv1h.poolId) as entityId,
    lv1h.totalVolumeSwaps as volumeSwaps1h,
    lv1h.minBlockNumber as volumeMinBlock1h,
    lv1h.maxBlockNumber as volumeMaxBlock1h,
    lv1h.totalVolumeUSD as volumeUSD1h,
    lv1h.`timestamp` as volumeUSDTimestamp1h,
    pv1h.totalVolumeSwaps as prevVolumeSwaps1h,
    pv1h.minBlockNumber as prevVolumeMinBlock1h,
    pv1h.maxBlockNumber as prevVolumeMaxBlock1h,
    pv1h.totalVolumeUSD as prevVolumeUSD1h,
    pv1h.`timestamp` as prevVolumeUSDTimestamp1h,
    lv1h.totalVolumeUSD - pv1h.totalVolumeUSD as volumeUSDChange1h,
    (lv1h.totalVolumeUSD - pv1h.totalVolumeUSD) / pv1h.totalVolumeUSD as volumeUSDChangePercent1h
FROM last_1_hour_volumes lv1h
LEFT JOIN prev_1_hour_volumes AS pv1h ON lv1h.poolId = pv1h.poolId
LEFT JOIN pools_store FOR SYSTEM_TIME AS OF lv1h.proc_time AS pool ON pool.entityId = lv1h.poolId
WHERE
    -- The goal here is to avoid writing any data before 1 hour ago
    (CURRENT_WATERMARK(lv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(lv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3))) OR
    (CURRENT_WATERMARK(pv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(pv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3)))

观察结果是,一旦有新交易,它就会正确写入预期值。但例如 20 分钟后,如果没有新交易,它必须正确更新“最后 1 小时”的总交易量,但水印似乎停留在最后一个事件时间戳。

我想知道 Kafka 源是否没有提前水印或配置不正确,或者可能是我的查询计算过去 1 小时和前 1 小时 SUM 的方式。

我已经尝试过这些配置,但没有一个有帮助:

  • setAutoWatermarkInterval()
    设置为 5 秒
  • pipeline.auto-watermark-interval
    设置为 5 秒
  • table.exec.source.idle-timeout
    设置为 5 秒
  • table.exec.emit.allow-lateness
    设置为 5 秒
  • 将并行度和分区都设置为 1
apache-kafka apache-flink flink-streaming flink-sql
1个回答
0
投票

如果没有任何事件,则在内置水印支持的情况下,不可能触发最终输出。您有几个选择:

(1) 使用“停止并排水”来彻底停止作业。这将通过带有巨大时间戳的水印发送,该水印将触发所有待处理的计时器并关闭所有打开的窗口。 (2) 以批处理模式运行查询,或使用支持有界流的源,例如 Kafka。

(3) 使用 DataStream API 实现自定义水印策略,该策略使用处理时间计时器来检测所有源何时变为空闲状态,并安排其提前水印。将流转换为表后,您需要配置 SQL 表以使用

SOURCE_WATERMARK()

作为 SQL

WATERMARK
    

© www.soinside.com 2019 - 2024. All rights reserved.