激活水印后 Flink 中不显示记录

问题描述 投票:0回答:0

尝试使用本地 Kinesis 流在本地运行 flink 应用程序。

以下代码完美运行(例如,可以在汇表路径中看到记录),但是当我将水印从

event_ts
更改为
event_ts - INTERVAL '10' SECOND
时,flink作业继续运行而不会在汇表中产生任何记录。

有什么想法吗?

谢谢。

source = (
    TableDescriptor.for_connector("kinesis")
    .schema(
        Schema.new_builder()
        .column("x", DataTypes.INT())
        # some more columns... 
        .column("event_ts", DataTypes.TIMESTAMP(3))
        .watermark("event_ts", "event_ts") 
        .build()
    )
    .option("stream.name", "some_stream")
    .option("aws.endpoint", "http://localhost:4566")
    .option("aws.trust.all.certificates", "true")
    .option("aws.credentials.basic.accesskeyid", "nokey")
    .option("aws.credentials.basic.secretkey", "nokey")
    .option("scan.shard.idle.interval", "200")
    .option("scan.watermark.sync.interval", "200")
    .option("scan.watermark.sync.queue.capacity", "1")
    .option("scan.stream.initpos", "TRIM_HORIZON")
    .option("format", "json")
)

sink = (
    TableDescriptor.for_connector("filesystem")
    .schema(
        Schema.new_builder()
        .column("x", DataTypes.INT()) 
        # some more columns... 
        .column("event_ts", DataTypes.TIMESTAMP(3))
        .build()
    )
    .option("sink.partition-commit.policy.kind", "success-file")
    .option("sink.rolling-policy.file-size", "1KB")
    .option("sink.rolling-policy.rollover-interval", "1s")
    .option("path", "file://some_path/")
    .option("format", "json")
)

result = table_env.execute_sql("""
    INSERT INTO sink_table 
    SELECT 
        some_other_columns...,
        AVG(X, temp) OVER ( 
            PARTITION BY some column
            ORDER BY event_ts
            RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
        ) AS x2,
        event_ts
    FROM source_table
""") 

apache-flink flink-streaming flink-sql pyflink amazon-kinesis-analytics
© www.soinside.com 2019 - 2024. All rights reserved.