system.reactive 相关问题

System.Reactive是指.NET的Reactive Extensions,也称为Rx。 Rx为开发人员提供了通用IObservable <T>接口的反应式编程模型,而不是传统的命令式编程模型或严格依赖.NET事件或特定API的其他反应式编程模型。

如何为一组可观察对象实现 Observable.Publish

我正在寻找实现以下功能: 公共静态 IObservable 发布( 这个 IObservable[] 来源, 函数 我正在寻找实现以下功能: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) { // how? } 行为应该与现有的相同 IObservable<TResult> Publish<TSource, TResult>( IObservable<TSource> source, Func<IObservable<TSource>, Iobservable<TResult>> selector) 当然唯一的区别是源(和选择器的参数)是一个可观察数组。基本上,“给定一组可观察对象,产生一个结果可观察对象,同时共享对数组的每个可观察对象的单个订阅”。 这可以用任何巧妙的方式完成吗? 到目前为止,我唯一的想法是通过链接单个可观察的 .Publish() 调用的表达式树来实现这一点——出于性能原因,这是不可行的。 或者去做这样的事情。但这看起来很乱,我发现很难推理出正确性 public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) { return Observable.Create<TResult>(observer => { var published = sources.Select(x => x.Publish()).ToArray(); var result = new CompositeDisposable(); result.Add(selector(published).Subscribe(observer)); foreach(var p in published) result.Add(p.Connect()); return result; }); } 我现在想出了一个解决方案,但很高兴听到更好的想法! public static IObservable<TResult> Publish<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IReadOnlyList<IObservable<TSource>>, IObservable<TResult>> selector) { IObservable<TResult> Generator(ImmutableArray<IObservable<TSource>> published) { return published.Length == sources.Count ? selector(published) : sources[published.Length].Publish(x => Generator(published.Add(x))); } return Generator(ImmutableArray<IObservable<TSource>>.Empty); } 为什么这个有用的问题出现了。为了清楚起见,我附上了我打算用它做的事情之一。以下函数与现有的 CombineLatest() 工作方式相同,但集成了节流功能:项目的第一个组合会立即发出,之后的所有内容都会去抖动,如果结果选择器的调用开销很大,这会很有帮助。 public static IObservable<TResult> CombineLatest<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, TimeSpan timeSpan, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; return sources.Publish<TSource, TResult>(publishedSources => publishedSources .CombineLatest(_ => Unit.Default) .Publish(x => x.Take(1).Merge(x.Skip(1).Sample(timeSpan, scheduler)) .Publish(signal => publishedSources .Select(source => signal.WithLatestFrom(source, (_,value) => value)) .Zip() .Select(resultSelector))); } 在我看来,您使用的selector是错误的。 这是您当前的实现selector: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) => selector(sources); 这是微不足道的。这也有点同义反复。你的 selector 是你想要创造的东西。 唯一明智的选择器是Func<IObservable<TSource>, IObservable<TResult>> selector. 可以这样吗: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>, IObservable<TResult>> selector) => sources.Select(x => x.Publish(selector)).Merge();

回答 2 投票 0

使用 AtomicIntegers 构造的可观察对象在 RxJava 中的意外行为

这是测试用例: 导入 io.reactivex.rxjava3.core.*; 导入 java.util.concurrent.TimeUnit; 导入 java.util.concurrent.atomic.AtomicInteger; 公共课 MainTest { 公共静态 AtomicInte ...

回答 1 投票 0

System.Reactive:实现一个 IObservable<T>

我需要创建一个自定义的 IObservable。我在那里读了一些书,最后我不应该直接实施 IObservable。 我注意到有一个 ObservableBase。这是……

回答 2 投票 0

使用计时器和 Rx 生成整数列表

这对其他人来说似乎是一个基本问题,但由于我刚刚开始使用 Rx,我想知道是否有人可以指出正确的方向。我正在尝试订阅

回答 2 投票 0

将异步函数传递给类型为 Action<X> 的参数? [重复]

应用有如下定义 IDisposable IObservable.Subscribe(Action onNext) 它被传递了一个异步函数: 异步任务流程(X通知,

回答 1 投票 0

IObservable.Subscribe OnNext 操作不会在 blazor webassembly 中阻塞

我正在使用 https://github.com/dotnet/reactive 我使用 CombineLatest 将 observableA(通过计时器发出项目)与 observableB(手动插入)结合起来。 在 OnNext 方法中,有时我会向

回答 1 投票 0

我们如何从 IObservable<T>> 创建一个 IObservableList<IList<T>?

问题 我们如何从 IObservable> 到 IObservableList(来自 DynamicData)? 语境 我在我的项目中同时使用 Reactive 扩展和 DynamicData。 我

回答 3 投票 0

IObservable.Subscribe ONext 操作不会在 blazor webassembly 中阻塞

我正在使用 https://github.com/dotnet/reactive 我使用 CombineLatest 将 observableA(通过计时器发出项目)与 observableB(手动插入)结合起来。 在 OnNext 方法中,有时我会向

回答 0 投票 0

如何在 Rx.Net 中实现 exhaustMap 处理程序?

我正在从 rxjs 中寻找类似于 exhaustMap 运算符的东西,但 RX.NET 似乎没有这样的运算符。 我需要实现的是,在源流的每个元素上,...

回答 1 投票 0

在 Observable 集合的 Scan 方法中跳过空值

我有一个带有时间戳的 Observable 项目集合。 我使用 Scan 方法包装每个项目,并添加对集合中最后一个有效项目的引用。 可观察

回答 0 投票 0

RxJS 每秒钟拨打电话不超过一次,但不要丢失任何电话。

基本上我想创建一个队列。像const queue = new BehaviorSubject([]) queue.subscribe((args) => someCall(args))这样的东西,我可能会在几个地方调用queue.next({arg1, arg2, arg3})。

回答 1 投票 0

用TestScheduler测试一个IConnectableObservable。

好吧,虽然已经很晚了,但我还是无法解决为什么会发生以下情况。我正在尝试测试以下(简化的)IConnectableObservable。 : Private const int ...

回答 1 投票 1

如何等待一个observable完成?

在Node.js中,你可以设置一个服务器,只要服务器还活着,并且在事件循环中处于活跃状态,这个进程就不会终止。我想知道是否可以用......做这样的事情。

回答 1 投票 0

在异步中调用一个PInvoke,执行后不会返回主线程。

我正在使用一个非托管库,它要求所有对其API的调用都在同一个线程上运行。我们希望使用Reactive扩展的EventLoopScheduler来实现这一目标,因为我们将...

回答 1 投票 4

带有单个任务的异步OnNext

我有一个主题,每100毫秒得到一个int。这个数字在每次迭代时递增。我订阅这个主题,得到50的块回,并等待几秒钟。例如:...

回答 1 投票 0

反应式扩展中的最大线程数 并行扩展中的最大线程数

因为微软可能杀了所有的并行扩展CTP的下载链接,我完全迷茫了。我想指定在某个时间运行的最大任务数,因为我希望线程数多于处理器数。

回答 2 投票 2

当一个属性发生变化时,重新启动一个可观察的订阅。

我有一个简单的订阅,是在MyClass中创建的。/myService.Connect返回IObservable。 myService.Connect(requestParameters) .Subscribe(DoSomething); 有一个 ...

回答 1 投票 3

我如何使用Rx.Net来处理可能是突发或垃圾邮件的事件?

我正在编写处理来自设备的事件的代码,这些事件可能会以突发的方式发生,这是常见的情况,也可能在错误的情况下非常频繁地发生。我想要的行为是 永不转发...

回答 1 投票 1

将带有IO操作的无限循环转化为可观察的循环。

你好,我正试图将一个长期运行的任务(其中我从一个套接字上每次迭代读取)转换为一个Observable.This任务接收一个CancellToken.I'm afraid I do not know how to transform ....

回答 1 投票 0

IObservable<T>使用linq查询。

为什么是IObservable 不是IEnumerable的实现 和IQueryable 但我可以使用LINQ查询吗?IObservable是一种特殊类型吗?我曾经以为只有LINQ查询......

回答 4 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.