我有一个非常简单的kafka生产者,使用以下方式发送数据。
ProducerRecord producerRecord = new ProducerRecord<String, Integer>(topic, symbol, value);
producer.send(producerRecord);
基本上是一个key -value对,值是一个整数。
我如何在ksql中为这个定义一个流?我试了一下。
create stream mystream (symbol varchar) with (value_format='integer', kafka_topic='myTopic');
这给了我一个错误... 看了一下文档,我发现value_format只支持delimited, json或者avro. 而且,delimited对我来说没有用。
是我运气不好吗?
* 更新
它接受了。
create stream myStream(value INT) with (value_format='kafka', kafka_topic='myTopic');
但是当我这样做时,我没有得到任何数据。
select * from myStream;
所以我的问题有两个方面。
1) 我确实需要使用:
create stream myStream(value INT) with (value_format='kafka', kafka_topic='myTopic');
2) 我需要添加Emit Changes;
select * from myStream Emit Changes;