WSO2 SP - 具有JSON属性的Kafka源

问题描述 投票:4回答:1

我正在尝试使用以下代码从Kafka读取JSON数据:

@source(type = 'kafka', bootstrap.servers = 'localhost:9092', topic.list = 'TestTopic', 
group.id = 'test', threading.option = 'single.thread', @map(type = 'json'))

define stream myDataStream (json object);

但失败了以下错误:

[2019-03-27_11-39-32_103] ERROR {org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper} - 流“myDataStream”没有名为“ABC”的属性,但收到的事件{“事件“:{”ABC“:”1“}}。因此放弃了消息。检查json字符串是否采用正确的默认映射格式。

我试过添加属性

@source(type = 'kafka', bootstrap.servers = 'localhost:9092', 
topic.list = 'TestTopic', group.id = 'test', 
threading.option = 'single.thread', 
@map(type = 'json', @attributes(ABC = '$.ABC')))

语法错误:

在'myDataStream'流中定义的'json'出错,未映射属性'json'

任何帮助将不胜感激。

apache-kafka wso2 kafka-consumer-api kafka-producer-api wso2sp
1个回答
1
投票

流的语法有错误,

define stream myDataStream (ABC string);

这里的属性名称是JSON消息的关键字,在本例中为ABC

© www.soinside.com 2019 - 2024. All rights reserved.