我的目标是创建一个迭代器中可以包含数万个项目的管道。由于这可能需要很长时间来处理,因此我想在生成结果时将结果返回给客户。
这不是一个网络服务器,它是我控制的两个系统之间的通信。
我认为我需要为每个项目创建一个未来,并使用一个写入流的闭包。看起来……很大。我仍在摆弄它,实际上
Pipeline::tap()
可能可以完成这项工作。
不过,我最大的问题是如何建立流。看起来我应该能够将
Response
返回给客户端,并为正文返回 WriteableStream
,并继续写入流,直到处理完所有项目。
但是,即使在服务器上,
Response
也只接受ReadableStream
作为主体。为了获得 WriteableStream
,我似乎需要使用套接字,但这样做我就失去了 Request
和 Response
的所有有用的抽象。我想在使用套接字时我不受所有 http 要求的约束,所以我可以制作自己的抽象,但我宁愿不这样做。
http-server
是建立在套接字上的,所以看起来这应该是可能的。
任何将管道结果流式传输回客户端的示例都将非常受欢迎!
这是我目前拥有的代码。流从请求处理程序传入,徒劳地希望它可以在管道完成之前通过
Response
返回。我的代码的这次迭代使用了 tap()
,这可能不是它的预期用途:
$pipeline = Pipeline::fromIterable($bigList)
->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
->unordered()
->map(fn (ListItem $item) => $this->getResultForItem($item))
->filter(fn (?Result $r) => $r instanceof Result);
// if there's a stream, don't do anything here that will block until the pipeline is finished.
if (!is_null($stream)) {
$streamStarted = false;
$pipeline->tap(function (Result $r) use ($stream, &$streamStarted) {
$stream->write($streamStarted ? ',' : '[');
$streamStarted = true;
$stream->write(json_encode($r));
});
// something here to send the `end()` message when the pipeline has been completed for all items
//$future = async()
} else {
foreach($pipeline as $profile) {
$indexed[$profile->dsid] = $profile;
}
}
如果我可以将 WriteableStream 添加到响应中,我想我会处于良好状态。
似乎没有办法像我上面想象的那样打开流并写入它。相反,可以创建一个
ReadableStream
来自动完成这项工作。
$pipeline = Pipeline::fromIterable($bigList)
->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
->unordered()
->map(fn (ListItem $item) => $this->getResultForItem($item))
->filter(fn (?Result $r) => $r instanceof Result);
->map(fn(Result $r) => json_encode($r));
$stream = new ReadableIterableStream($pipeline->getIterator());
这会将每个
Result
对象写入可用的流中。
客户然后会执行以下操作:
while($text = $response->getBody()->read()) {
$response[] = json_decode($text);
}