如何使用kafka主题在FlinkSQL中运行批处理模式

问题描述 投票:0回答:1

我有创建语句FLINK SQL的记录。

CREATE TABLE en_trans (
  `transid` INTEGER,
  `productname` INTEGER,
   PRIMARY KEY (transid) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'en_trans',
  'properties.bootstrap.servers' = '....:9092',
  'properties.group.id' = 'en_trans_group_test',
  'key.format' = 'avro-confluent',
  'value.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081',
  'value.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081'
);

select * from en_trans where transid=123;

我有错误

org.apache.flink.table.api.ValidationException: Querying an unbounded table 'default_catalog.default_database.en_trans' in batch mode is not allowed. The table source is unbounded.

我不需要流模式,因为我需要将其提供为休息服务、分页目的。不是

STREAMING
模式。

apache-kafka apache-flink avro flink-sql
1个回答
0
投票

切换到流模式,您需要创建一个

StreamTableEnvironment
而不是
BatchTableEnvironment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

然后,注册您的表并运行查询

tEnv.executeSql("CREATE TABLE en_trans (...) WITH (...)");
tEnv.sqlQuery("SELECT * FROM en_trans WHERE transid=123").execute().print();

© www.soinside.com 2019 - 2024. All rights reserved.