我有创建语句FLINK SQL的记录。
CREATE TABLE en_trans (
`transid` INTEGER,
`productname` INTEGER,
PRIMARY KEY (transid) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'en_trans',
'properties.bootstrap.servers' = '....:9092',
'properties.group.id' = 'en_trans_group_test',
'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081',
'value.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081'
);
select * from en_trans where transid=123;
我有错误
org.apache.flink.table.api.ValidationException: Querying an unbounded table 'default_catalog.default_database.en_trans' in batch mode is not allowed. The table source is unbounded.
我不需要流模式,因为我需要将其提供为休息服务、分页目的。不是
STREAMING
模式。
切换到流模式,您需要创建一个
StreamTableEnvironment
而不是BatchTableEnvironment
。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
然后,注册您的表并运行查询
tEnv.executeSql("CREATE TABLE en_trans (...) WITH (...)");
tEnv.sqlQuery("SELECT * FROM en_trans WHERE transid=123").execute().print();