Zeppelin 笔记本中出现错误,指出无法创建接收器

问题描述 投票:0回答:1

我正在尝试在 Apache Flink 中处理后从 1 个 KDS 写入另一个 KDS。我使用 Zeppelin 笔记本通过以下查询创建接收器表:

%flink.ssql

CREATE TABLE seller_revenue (
    seller_id    VARCHAR,
    window_end   TIMESTAMP,
    sales        DOUBLE
) 
WITH (
    'connector'           = 'kinesis',
    'stream'              = 'seller_stream_window',
    'aws.region'          = 'ap-south-1',
    'scan.stream.initpos' = 'LATEST',
    'format'              = 'json'
)

然后我使用以下方式写入数据

%flink.ssql(parallelism=1)

INSERT INTO seller_revenue
SELECT
    seller_id,
    TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
    SUM(product_quantity * product_price) AS sales
FROM seller_sales
GROUP BY
    TUMBLE(proctime, INTERVAL '30' SECONDS),
    seller_id

但是出现以下错误:

Unable to create a sink for writing table 'hive.flink2.seller_revenue'.
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.

Unsupported options:

scan.stream.initpos

有人可以帮忙解决吗?

我尝试删除不受支持的选项

scan.stream.initpos
但是此后没有数据被写入。

apache-flink amazon-kinesis apache-zeppelin
1个回答
0
投票

如果您将 Zeppelin 笔记本部署为流应用程序,那么代码将可以工作。

在 Zeppelin 笔记本本身中,您无法执行这些步骤,我也遇到了类似的问题。

© www.soinside.com 2019 - 2024. All rights reserved.