如何以同步方式合并两个TPL DataFlow管道?

问题描述 投票:1回答:1

我想编写一个评估两个传感器的传感器数据的应用程序。两个传感器都在Package对象中发送数据,这些对象被拆分为Frame对象。 Package本质上是Tuple<Timestamp, Data[]>FrameTuple<Timestamp, Data>。然后,我需要始终使用两个来源中带有最早时间戳的Frame

所以基本上我的对象流是

Package -(1:n)-> Frame \
                        }-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /

示例

假设每个Package包含2个或3个值(真实度:5-7)和以1递增的整数时间戳(真实度:〜200Hz =>〜5ms增量)。为了简单起见,“数据”仅为timestamp * 100

Packages (timestamp, values[])

Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
 (29, [2700, 2800, 2900]), ...}

Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
 (26, [2400, 2500, 2600]), ...}

(1:n)步骤之后:

Frames (timestamp, value)

Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
 (29, 2900), ...}

Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

pair synchronized步骤之后:

Merged tuples (timestamp, source1, source2)

{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

请注意,由于两个来源的发送了一个值,因此缺少时间戳23。那只是副作用。我可以放一个空的元组,没关系。元组是(27, 2700, 2700)还是((27, 2700), (27, 2700))也无所谓,i。 e。 Tuple<Timestamp, Data, Data>Tuple<Frame, Frame>


我很确定,如果我没看错文档,(1:n)部分应该是TransformManyBlock<Package, Frame>

但是我要用哪个块TransformManyBlock<Package, Frame>part?起初,我以为pair synchronized是我要寻找的东西,但看起来它只是将两个元素索引配对了-明智的。但是,由于既不能确保两个流水线都以相同的时间戳开始,也不能确保两个流水线都将始终产生稳定的连续时间戳流(因为有时传输时会丢失几帧的数据包),所以这不是一种选择。因此,我需要的更多是“ MergeBlock”,并可以决定将两个输入流的哪个元素接下来传播到输出(如果有)。

我想我自己必须写类似的东西。但是我很难编写能正确处理两个ISourceBlock变量和一个ITargetBlock变量的代码。我基本上早就被卡住了:

JoinBlock<Frame, Frame>

我甚至对此草稿也不确定:方法应该是JoinBlock<Frame, Frame>,所以我可以使用private void MergeSynchronized( ISourceBlock<Frame> source1, ISourceBlock<Frame> source2, ITargetBlock<Tuple<Frame, Frame>> target) { var frame1 = source1.Receive(); var frame2 = source2.Receive(); //Loop { // Depending on the timestamp [mis]match, // either pair frame1+frame2 or frame1+null or null+frame2, and // replace whichever frame(s) was/were propagated already // with the next frame from the respective pipeline //} } 吗?循环的条件是什么?在哪里以及如何检查完成情况?如何解决明显的问题,即我的代码意味着我必须等到流中的间隙over才能意识到存在间隙?]]

我考虑过的替代方法是在管道中添加一个额外的块,确保每个传感器将足够的“前哨帧”放入管道中,以便始终将每个管道中的第一个对齐,将正确的两个对齐。我guess

是一种async,它读取一个Frame,将“预期”时间戳与实际时间戳进行比较,然后为缺少的时间戳插入前哨帧,直到该帧的时间戳再次正确。 >

或者var frame1 = await source1.ReceiveAsnyc();部分是在TPL Dataflow对象处停止并开始已经在TransformManyBlock部分工作的实际代码的地方?

我想编写一个评估两个传感器的传感器数据的应用程序。两个传感器都将其数据发送到Package对象中,Package对象将被拆分为Frame对象。包本质上是一个元组<...>

task-parallel-library tpl-dataflow
1个回答
0
投票

我知道使用反射不是一个好的解决方案。但是,TPL DataFlow API的主要问题是,一切都是内部/私有和密封的。这使您无法进行任何扩展。

无论如何,如果您遇到问题,您都需要手工实施或使用肮脏的骇客,如:

© www.soinside.com 2019 - 2024. All rights reserved.