Azure Stream Analytics查询以合并两个事件

问题描述 投票:0回答:1

我尝试合并来自同一EventHub输入的两个different事件(EventB和EventC)。我要实现的是输出(Azure函数)合并事件(EventB + EventC)whenever接收到EventC。

这是事件的外观:

{
    "EventB": {
        "Claim": {
            "EventAUri": "A/123",
            "Uri": "B/456"
        },
        "Metainfo": {
            "Color": "Green"
        }
    }   
}

{
    "EventC" : {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        }
    }
}

EventB仅发送一次,而EventC每分钟发送几次。上面示例的所需输出为:

    {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        },
        "BMetainfo": {
            "Color": "Green"
        }
    }

这是我到目前为止尝试过的:

WITH AllEvents AS (
    SELECT 
        *
    FROM
        ehubinput
),
EventB AS (
select
    EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
    select EventC
from AllEvents
Where EventC Is Not NUll
)

Select * From EventB 
 Inner Join EventC 
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri

不幸的是,我的代码将输出EventB + x(每个EventC)* EventC而不是EventB + Last EventC。...

有人可以帮我吗?

更新:

This is my input

This is my current output。 (我只希望将最新的EventC与EventB结合在一起,而不是像现在那样将流中的每个事件都合并在一起)

azure azure-stream-analytics
1个回答
1
投票

我转载了您的案件,并提出以下查询:

WITH AllEvents AS (
  SELECT 
    *
  FROM
  Input
),
EventB AS (
 select
 EventB
 From AllEvents
 Where EventB Is Not NUll
),
EventC AS (
  select EventC, EventC.Time
  from AllEvents
  Where EventC Is Not NUll
),
test as (
  Select *, EventC.* From EventB 
  Inner Join EventC 
  On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
 AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)


select topone() over (order by Time) from test  GROUP BY TumblingWindow(second, 10)   

对于一系列事件,它将始终返回匹配的(EventC,EventB)对的最后一个组合。如果这不是您的预期输出,请为上面指定的输入写出预期输出吗?

我使用了VS2019和Stream Analytics扩展。我根据上面的描述指定了本地输入。

更新

查询已更新。我注意到只有示例有效负载中的最后一个EventC包含属性“ Time”。通过为每个事件C拥有该属性,并使用上面的查询,您将得到“瓦瑟堡”的结果。

当然,必须对输出进行格式化,但是在这种情况下的结果是正确的。

进一步更新我对此进行了更多的研究,因为我发现它确实很有趣,并提出了以下查询,该查询在概念上与上一个有所不同,我会说更精确:

-- timestamp by how events are enqueued
WITH AllEvents AS (
    SELECT 
       Input
     FROM 
     Input timestamp by input.EventEnqueuedUtcTime  
    ),

-- get the last eventB, because only last eventB is relevant
EventB AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventB Is Not NUll)  as EventB 
    From AllEvents 
 ),
 LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY  slidingwindow(second, 60)),

 -- get the last eventC
 EventC AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventC Is Not NUll)  as EventC 
    From AllEvents 
 ),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY  slidingwindow(second, 60)),

-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
   Select LastB.topone.*, LastC.topone.* From LastB 
   Inner Join LastC 
   On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60 
   AND LastB.topone.EventB.EventB.Claim.Uri  = LastC.topone.EventC.EventC.Claim.EventBUri)

-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin  GROUP BY  slidingwindow(second, 60)

-- Just a cross-check what is the last event B
select * into Output1 from LastB

我使用了时间窗口功能,因为您提到事件是在一分钟内到达的。因此,从本质上讲,其思想是提取最后一个B事件和lastC事件,然后将匹配的事件传播到输出。

我使用事件中心消息发布器在真实事件中心上对其进行了测试,以便可以模拟事件流,类似于您的示例:enter image description here

然后,我在本地观察输出以查看在上一个事件之后是否能获得正确的结果:

enter image description here

另外,我将time属性添加到每个事件(B和C),如您从消息模拟器看到的那样,因为该属性用于在查询中对事件进行排序。当然,您可以将其替换为其他一些属性,例如EventEnqueuedUtcTime或类似的属性。

我希望您会发现这两种不同方法中的一种对最终解决方案很有用。

© www.soinside.com 2019 - 2024. All rights reserved.