尝试使用本地 Kinesis 流在本地运行 flink 应用程序。
以下代码完美运行(例如,可以在汇表路径中看到记录),但是当我将水印从
event_ts
更改为event_ts - INTERVAL '10' SECOND
时,flink作业继续运行而不会在汇表中产生任何记录。
有什么想法吗?
谢谢。
source = (
TableDescriptor.for_connector("kinesis")
.schema(
Schema.new_builder()
.column("x", DataTypes.INT())
# some more columns...
.column("event_ts", DataTypes.TIMESTAMP(3))
.watermark("event_ts", "event_ts")
.build()
)
.option("stream.name", "some_stream")
.option("aws.endpoint", "http://localhost:4566")
.option("aws.trust.all.certificates", "true")
.option("aws.credentials.basic.accesskeyid", "nokey")
.option("aws.credentials.basic.secretkey", "nokey")
.option("scan.shard.idle.interval", "200")
.option("scan.watermark.sync.interval", "200")
.option("scan.watermark.sync.queue.capacity", "1")
.option("scan.stream.initpos", "TRIM_HORIZON")
.option("format", "json")
)
sink = (
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("x", DataTypes.INT())
# some more columns...
.column("event_ts", DataTypes.TIMESTAMP(3))
.build()
)
.option("sink.partition-commit.policy.kind", "success-file")
.option("sink.rolling-policy.file-size", "1KB")
.option("sink.rolling-policy.rollover-interval", "1s")
.option("path", "file://some_path/")
.option("format", "json")
)
result = table_env.execute_sql("""
INSERT INTO sink_table
SELECT
some_other_columns...,
AVG(X, temp) OVER (
PARTITION BY some column
ORDER BY event_ts
RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
) AS x2,
event_ts
FROM source_table
""")