以Siddhi获取NATS流事件序列号

问题描述 投票:0回答:1

我有一个文本格式的推文流(TwitterStream)和每个推文的情感流(SentimentStream)。 SentimentStream订阅TwitterStream,进行情感分析并发布带有结果和TwitterStream序列号的新消息。

我正在尝试将这两个流加入,其中SentimentStream.seq等于这些推文的序号。我遇到的问题是我无法从TwitterStream获得序列号的“句柄”。

我一直试图找到一种获取事件“元数据”的方法,该方法可能会对事件的位置/序列号提供一些见解。

@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')

@sink(type = 'log', 
    @map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);

@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);

@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);

-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;

任何帮助将不胜感激。

siddhi nats.io nats-streaming-server
1个回答
0
投票

我们已经通过https://github.com/siddhi-io/siddhi-io-nats/issues/25创建了功能改进请求以提供此支持。

© www.soinside.com 2019 - 2024. All rights reserved.