我正在尝试将我的代码从使用 join_all 处理一组期货转换为流,因为这似乎是一种更强大的方法,因为它允许我在未来的结果可用时立即处理它们,而不是等待所有结果。
然而,我遇到了一生无法解决的问题。
游乐场的最小复制:
use std::sync::Arc;
use futures::{Stream, StreamExt, stream::BoxStream};
struct Querier {
id: u32,
}
impl Querier {
fn query<'a>(&'a self) -> impl Stream<Item = u32> + 'a {
futures::stream::once(async move { self.id })
}
}
fn query_all<'a>() -> BoxStream<'a, u32> {
let queriers = vec![
Arc::new(Querier { id: 1 }),
Arc::new(Querier { id: 2 }),
];
futures::stream::iter(queriers.into_iter())
.flat_map(move |q| {
let q = q.clone();
q.query()
})
.boxed()
}
#[tokio::main]
async fn main() {
let _stream = query_all();
}
如您所见,
box
ing 流和/或克隆 Querier
没有帮助。我想我明白为什么了,这是因为查询器仍然在本地定义并且 query
的返回与它相关。
在查询函数中用
&self
替换 Arc<Self>
是可行的,但我想学习其他替代方法,因为这是一个我无法解决的反复出现的问题,并且查询可能来自图书馆。
有没有办法在保持查询器实例化本地的同时解决这个问题?
附言取得一些进展:this 编译但它不再返回流中的所有元素。
有些东西必须拥有
Querier
,它不可能是:
query_all
因为你想从中返回 Stream
所以本地值不能被引用|q| q.query
同样的原因Stream
futures::stream::iter(queriers)
因为它放弃了对其产生的价值的所有权query
或其中的任何内容,因为您只给它一个参考不幸的是,在
Querier
返回后,我们没有 query_all
的有效所有者,这意味着对它的任何引用都是悬而未决的和无效的,但是借用检查器已经支持你并且不允许你将代码编译为是。
要修复它,你必须确定什么应该拥有
Querier
,有2个选项:
Arc
,这是你已经找到的解决方案。query
生成的流所以你可以简单地给它项目而不是对它的引用:
fn query(self) -> impl Stream<Item = u32> {
use futures::stream::once;
once(async move { self.id }).chain(once( self.id * 2))
}