如何处理传入事件并基于事件中的字段,使用wso2写入不同的流?

问题描述 投票:1回答:1

我正在尝试使用单个流,处理传入的json格式,并根据事件中的属性写入不同的流。例如,如果输入流包含以下内容:

{ "event_type" : "temperature",
  "json" : {    
            "type": "Temperature",
            "DeviceID":"xyz",
            "temperature": "32",
            "timestamp" :  "2019-03-19T12:37:43.356119Z"
            }
 }

另一个事件如下:

{ "event_type" : "location",
  "json" : {    
        "type": "GPS",
          "DeviceID":"xyz",
         "location": {"coordinates": [-73.856077, 40.848447]},
         "timestamp" :  "2019-09-22T00:00:00+05:30"
           }
 }

这两个事件都被推送到一个http端点(这是我面临的限制)

如何使用单个http源流,处理这些事件,如果event_typetemperature插入mongo db中的temperature_collection,如果event_typelocation插入mongo db中的location_collection?

  1. 是否可以使用单个流执行此操作?
  2. 如果不是,我怎样才能避免编写多个端点,每个端点对应一种事件类型?
wso2 siddhi stream-processing wso2sp event-stream-processing
1个回答
0
投票

是的,可以只定义一个流,并使用Siddhi filters路由每个流,

@source(type='http' , receiver.url='http://localhost:8000/SensorStream', 
    @map(type='json', fail.on.missing.attribute='false', 
        @attributes(eventType='$.event_type', type='$.json.type',deviceID='$.json.DeviceID', temperature='$.json.temperature', location='$.json.location', timestamp='$.json.timestamp' ) ) ) 
define stream SensorStream(eventType string, type string, deviceID string, temperature string, location string, timestamp string);

from SensorStream[eventType=='temperature']
select deviceID, temperature, timestamp  
insert into TemperatureStream;

from SensorStream[eventType=='location']
select deviceID, location, timestamp  
insert into LocationStream;

如上例所示,源映射属性'fail.on.missing.attribute'用于确保可以将不同的格式与自定义映射一起映射到单个流。在事件到达端点之后,流然后基于使用过滤器的属性的值来划分。

© www.soinside.com 2019 - 2024. All rights reserved.