我有一个简单的 Flink (v1.17) SQL 流作业,使用 Kafka 作为源,我配置了一些与水印相关的配置,但我似乎无法理解如何强制水印前进,即使没有活动即将到来。
最终目标是计算某个字段(金额)的Last 1h SUM。以下是 Flink 作业(Java 中)的相关部分以及我正在运行的 SQL 语句:
public class StreamingJob {
// ...
public static void main(String[] args) throws Exception {
// ...
List<String> sqlStatements = ...
final EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getConfig().setAutoWatermarkInterval(1000);
streamEnv.getConfig().setLatencyTrackingInterval(10000);
tableEnv.getConfig().getConfiguration().setString("pipeline.auto-watermark-interval", "5000ms");
tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "5000ms");
tableEnv.getConfig().getConfiguration().setString("table.exec.emit.allow-lateness", "1d");
for (String statement : sqlStatements) {
tableEnv.executeSql(statement);
}
streamEnv.execute(jobId);
}
CREATE TABLE trades_stream_by_hour (
`entityId` STRING,
`poolId` STRING,
`amountUSD` FLOAT,
`blockNumber` BIGINT,
`timestamp` BIGINT,
`rowtime` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECONDS
) WITH (
'connector' = 'stream',
'topic' = 'trades',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1705851977000', -- 2 hours ago in unix timestamp milliseconds
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true',
'debezium-json.map-null-key.mode' = 'DROP',
'debezium-json.encode.decimal-as-plain-number' = 'true'
);
-- Supressed the "my_sink" CREATE TABLE (jdbc sink table) statement for brevity
-- Supressed the "pools" CREATE TABLE (jdbc lookup table) statement for brevity
CREATE VIEW last_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY poolId
ORDER BY window_time DESC
) AS rn
FROM (
SELECT
poolId,
SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
COUNT(*) AS totalVolumeTrades,
MAX(`timestamp`) AS `timestamp`,
MAX(`blockNumber`) AS `maxBlockNumber`,
MIN(`blockNumber`) AS `minBlockNumber`,
HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) as window_time
FROM
trades_stream_by_hour
WHERE
amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
CURRENT_TIMESTAMP - INTERVAL '1' HOUR AS TIMESTAMP(3)
)
GROUP BY
poolId,
HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
)
) WHERE rn = 1;
CREATE VIEW prev_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY poolId
ORDER BY window_time DESC
) AS rn
FROM (
SELECT
poolId,
SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
COUNT(*) AS totalVolumeSwaps,
MAX(`timestamp`) AS `timestamp`,
MAX(`blockNumber`) AS `maxBlockNumber`,
MIN(`blockNumber`) AS `minBlockNumber`,
HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_time
FROM
trades_stream_by_hour
WHERE
amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
CURRENT_TIMESTAMP - INTERVAL '2' HOUR AS TIMESTAMP(3)
)
GROUP BY
poolId,
HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
) WHERE rn = 12;
INSERT INTO
my_sink
SELECT
COALESCE(lv1h.poolId, pv1h.poolId) as entityId,
lv1h.totalVolumeSwaps as volumeSwaps1h,
lv1h.minBlockNumber as volumeMinBlock1h,
lv1h.maxBlockNumber as volumeMaxBlock1h,
lv1h.totalVolumeUSD as volumeUSD1h,
lv1h.`timestamp` as volumeUSDTimestamp1h,
pv1h.totalVolumeSwaps as prevVolumeSwaps1h,
pv1h.minBlockNumber as prevVolumeMinBlock1h,
pv1h.maxBlockNumber as prevVolumeMaxBlock1h,
pv1h.totalVolumeUSD as prevVolumeUSD1h,
pv1h.`timestamp` as prevVolumeUSDTimestamp1h,
lv1h.totalVolumeUSD - pv1h.totalVolumeUSD as volumeUSDChange1h,
(lv1h.totalVolumeUSD - pv1h.totalVolumeUSD) / pv1h.totalVolumeUSD as volumeUSDChangePercent1h
FROM last_1_hour_volumes lv1h
LEFT JOIN prev_1_hour_volumes AS pv1h ON lv1h.poolId = pv1h.poolId
LEFT JOIN pools_store FOR SYSTEM_TIME AS OF lv1h.proc_time AS pool ON pool.entityId = lv1h.poolId
WHERE
-- The goal here is to avoid writing any data before 1 hour ago
(CURRENT_WATERMARK(lv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(lv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3))) OR
(CURRENT_WATERMARK(pv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(pv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3)))
观察结果是,一旦有新交易,它就会正确写入预期值。但例如 20 分钟后,如果没有新交易,它必须正确更新“最后 1 小时”的总交易量,但水印似乎停留在最后一个事件时间戳。
我想知道 Kafka 源是否没有提前水印或配置不正确,或者可能是我的查询计算过去 1 小时和前 1 小时 SUM 的方式。
我已经尝试过这些配置,但没有一个有帮助:
setAutoWatermarkInterval()
设置为 5 秒pipeline.auto-watermark-interval
设置为 5 秒table.exec.source.idle-timeout
设置为 5 秒table.exec.emit.allow-lateness
设置为 5 秒