KSQL无法创建一个键为字符串,值为Integer的流。

问题描述 投票:1回答:1

我有一个非常简单的kafka生产者,使用以下方式发送数据。

ProducerRecord producerRecord = new ProducerRecord<String, Integer>(topic, symbol, value);
producer.send(producerRecord);

基本上是一个key -value对,值是一个整数。

我如何在ksql中为这个定义一个流?我试了一下。

create stream mystream (symbol varchar) with (value_format='integer', kafka_topic='myTopic');

这给了我一个错误... 看了一下文档,我发现value_format只支持delimited, json或者avro. 而且,delimited对我来说没有用。

是我运气不好吗?

* 更新

它接受了。

create stream myStream(value INT) with (value_format='kafka', kafka_topic='myTopic');

但是当我这样做时,我没有得到任何数据。

select * from myStream; 
apache-kafka ksql
1个回答
0
投票

所以我的问题有两个方面。

1) 我确实需要使用:

create stream myStream(value INT) with (value_format='kafka', kafka_topic='myTopic');

2) 我需要添加Emit Changes;

select * from myStream Emit Changes; 
© www.soinside.com 2019 - 2024. All rights reserved.