定期批处理混合流 - 玩具问题说明我的问题

问题描述 投票:0回答:1

这是一个组成的玩具示例,试图获得有关我的问题的更难部分的帮助。假设我有来自Kafka流的销售数据:

...
Period: 5, SalesPersonId: 78, Sale: TRUE, Timestamp: ...,  
Period: 5, SalesPersonId: 43, Sale: FALSE, Timestamp: ...,  
Period: 5, SalesPersonId: 33, Sale: TRUE, Timestamp: ...,  
...

每行代表特定销售人员的销售机会(在特定时期内)。

以下是期间的工作方式:期间大约持续2-3周。但是,这些时期不在我的控制之下;它们在到达流时已被分配。在期间之间的过渡期间,我可能仍然会在最后一段时间内收到一两天的数据(例如,日本的销售地点可能仍然处于旧时期)。梁聊天的人建议我可以在这种情况下使用会话窗口,如果我只是在我的密钥中包含句点并且大约延长2天的间隔时间。似乎这样可行。

我很清楚如何做这样的事情:每个时期的销售机会总数,每个销售人员每个时期的平均销售率等。例如,调用以下查询产生的PCollection:

SELECT
    period,
    salesPersonId,
    COUNT(*) as totalSalesOpportunities,
    COUNT(*) FILTER(WHERE sale) as totalSales,  
    ROUND(COUNT(*) FILTER (WHERE SALE)/COUNT(*),2) as salesRate  
FROM stream  
GROUP BY period, salesPersonId  

我的要求比这更复杂。假设我们公司有一个假设,即在一段时间内拥有更多销售机会的销售人员将获得更好的销售率。也许总的销售机会是动机的一种表现,或者额外的机会让更多的实践尝试出售那个时期出售的任何产品。所以,该公司想要这个统计数据:

在此期间(目前为止)销售机会的销售额为90%或更高的销售人员的总销售额是多少? 10%或更低的百分位数?即,

(TOTAL SALES MADE BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)/(TOTAL SALES OPPORTUNITIES BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)

当然,在一个时期的早期,第90百分位可能只有3个机会。但是,随着时间的推移,分布将分散,可能有40个机会。那么,如果这个统计数据更新,比如每小时,那也没关系。

据我所知,我需要做以下事情,称之为B:

Rekey A, apply ApproximateQuantiles, feed it back to filter A, reaggregate A.

但是,我不认为这可以逐步完成。那么我如何表达“逐步进行A,但每小时做一次B批量操作”?

或者,有没有更好的方法来处理梁的这种情况?

google-cloud-dataflow apache-beam dataflow
1个回答
0
投票

如果我正确理解您的问题,您需要对相同数据进行2种类型的聚合。

  1. A的增量聚合
  2. B的每小时聚合

这里需要注意的一点是,你不能让A依赖于B广告B依赖于A,因为这会在你的管道图中创建一个循环。

您可以从包含原始输入流的PC1开始。

PC2:PC1 - >做A'(与A相同) - >做B

PC3:PC1 - >执行A,PC2作为侧输入。

您可以阅读更多关于侧输入here的信息

© www.soinside.com 2019 - 2024. All rights reserved.