我尝试合并来自同一EventHub输入的两个different事件(EventB和EventC)。我要实现的是输出(Azure函数)合并事件(EventB + EventC)whenever接收到EventC。
这是事件的外观:
{
"EventB": {
"Claim": {
"EventAUri": "A/123",
"Uri": "B/456"
},
"Metainfo": {
"Color": "Green"
}
}
}
和
{
"EventC" : {
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
}
}
}
EventB仅发送一次,而EventC每分钟发送几次。上面示例的所需输出为:
{
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
},
"BMetainfo": {
"Color": "Green"
}
}
这是我到目前为止尝试过的:
WITH AllEvents AS (
SELECT
*
FROM
ehubinput
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC
from AllEvents
Where EventC Is Not NUll
)
Select * From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri
不幸的是,我的代码将输出EventB + x(每个EventC)* EventC而不是EventB + Last EventC。...
有人可以帮我吗?
更新:
This is my current output。 (我只希望将最新的EventC与EventB结合在一起,而不是像现在那样将流中的每个事件都合并在一起)
我转载了您的案件,并提出以下查询:
WITH AllEvents AS (
SELECT
*
FROM
Input
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC, EventC.Time
from AllEvents
Where EventC Is Not NUll
),
test as (
Select *, EventC.* From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)
select topone() over (order by Time) from test GROUP BY TumblingWindow(second, 10)
对于一系列事件,它将始终返回匹配的(EventC,EventB)对的最后一个组合。如果这不是您的预期输出,请为上面指定的输入写出预期输出吗?
我使用了VS2019和Stream Analytics扩展。我根据上面的描述指定了本地输入。
更新
查询已更新。我注意到只有示例有效负载中的最后一个EventC包含属性“ Time”。通过为每个事件C拥有该属性,并使用上面的查询,您将得到“瓦瑟堡”的结果。
当然,必须对输出进行格式化,但是在这种情况下的结果是正确的。
进一步更新我对此进行了更多的研究,因为我发现它确实很有趣,并提出了以下查询,该查询在概念上与上一个有所不同,我会说更精确:
-- timestamp by how events are enqueued
WITH AllEvents AS (
SELECT
Input
FROM
Input timestamp by input.EventEnqueuedUtcTime
),
-- get the last eventB, because only last eventB is relevant
EventB AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventB Is Not NUll) as EventB
From AllEvents
),
LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY slidingwindow(second, 60)),
-- get the last eventC
EventC AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventC Is Not NUll) as EventC
From AllEvents
),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY slidingwindow(second, 60)),
-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
Select LastB.topone.*, LastC.topone.* From LastB
Inner Join LastC
On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60
AND LastB.topone.EventB.EventB.Claim.Uri = LastC.topone.EventC.EventC.Claim.EventBUri)
-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin GROUP BY slidingwindow(second, 60)
-- Just a cross-check what is the last event B
select * into Output1 from LastB
我使用了时间窗口功能,因为您提到事件是在一分钟内到达的。因此,从本质上讲,其思想是提取最后一个B事件和lastC事件,然后将匹配的事件传播到输出。
我使用事件中心消息发布器在真实事件中心上对其进行了测试,以便可以模拟事件流,类似于您的示例:
然后,我在本地观察输出以查看在上一个事件之后是否能获得正确的结果:
另外,我将time属性添加到每个事件(B和C),如您从消息模拟器看到的那样,因为该属性用于在查询中对事件进行排序。当然,您可以将其替换为其他一些属性,例如EventEnqueuedUtcTime或类似的属性。
我希望您会发现这两种不同方法中的一种对最终解决方案很有用。