在 amphp 中流式传输结果

问题描述 投票:0回答:1

我的目标是创建一个迭代器中可以包含数万个项目的管道。由于这可能需要很长时间来处理,因此我想在生成结果时将结果返回给客户。

这不是一个网络服务器,它是我控制的两个系统之间的通信。

认为我需要为每个项目创建一个未来,并使用一个写入流的闭包。看起来……很大。我仍在摆弄它,实际上

Pipeline::tap()
可能可以完成这项工作。

不过,我最大的问题是如何建立流。看起来我应该能够将

Response
返回给客户端,并为正文返回
WriteableStream
,并继续写入流,直到处理完所有项目。

但是,即使在服务器上,

Response
也只接受
ReadableStream
作为主体。为了获得
WriteableStream
,我似乎需要使用套接字,但这样做我就失去了
Request
Response
的所有有用的抽象。我想在使用套接字时我不受所有 http 要求的约束,所以我可以制作自己的抽象,但我宁愿不这样做。

http-server
是建立在套接字上的,所以看起来这应该是可能的。

任何将管道结果流式传输回客户端的示例都将非常受欢迎!

这是我目前拥有的代码。流从请求处理程序传入,徒劳地希望它可以在管道完成之前通过

Response
返回。我的代码的这次迭代使用了
tap()
,这可能不是它的预期用途:

    $pipeline = Pipeline::fromIterable($bigList)
        ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
        ->unordered()
        ->map(fn (ListItem $item) => $this->getResultForItem($item))
        ->filter(fn (?Result $r) => $r instanceof Result);

    // if there's a stream, don't do anything here that will block until the pipeline is finished.
    if (!is_null($stream)) {
        $streamStarted = false;

        $pipeline->tap(function (Result $r) use ($stream, &$streamStarted) {

            $stream->write($streamStarted ? ',' : '[');
            $streamStarted = true;
            $stream->write(json_encode($r));

        });

        // something here to send the `end()` message when the pipeline has been completed for all items
        //$future = async()

    } else {
        foreach($pipeline as $profile) {
            $indexed[$profile->dsid] = $profile;
        }
    }

如果我可以将 WriteableStream 添加到响应中,我想我会处于良好状态。

php stream amphp
1个回答
0
投票

似乎没有办法像我上面想象的那样打开流并写入它。相反,可以创建一个

ReadableStream
来自动完成这项工作。

$pipeline = Pipeline::fromIterable($bigList)
    ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
    ->unordered()
    ->map(fn (ListItem $item) => $this->getResultForItem($item))
    ->filter(fn (?Result $r) => $r instanceof Result);
    ->map(fn(Result $r) => json_encode($r));

$stream = new ReadableIterableStream($pipeline->getIterator());

这会将每个

Result
对象写入可用的流中。

客户然后会执行以下操作:

while($text = $response->getBody()->read()) {
    $response[] = json_decode($text);
}
© www.soinside.com 2019 - 2024. All rights reserved.