合并 subscribe(on:options:) 运算符

问题描述 投票:0回答:2

我有关于 subscribe(on:options:) 运算符的问题。如果有人能帮助我解决这个问题,我将不胜感激。

我们从文档中得到了什么:

指定执行订阅、取消和请求操作的调度程序。 与影响下游消息的 receive(on:options:) 相比, subscribe(on:options:) 会更改上游消息的执行上下文。

此外,我从不同的文章中得到的是,除非我们明确指定

Scheduler
来接收下游消息(使用
receive(on:options:)
),否则消息将在用于接收订阅的
Scheduler
上发送。

此信息与我在执行过程中实际得到的信息不一致。

我有下一个代码:

Just("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

我期望下一个输出:

Map: false
Sink: false

但我得到的是:

Map: true
Sink: false

当我使用

Sequence
出版商时,也会发生同样的事情。

如果我交换

map
运算符和
subscribe
运算符的位置,我会收到我想要的:

Just("Some text")
    .subscribe(on: DispatchQueue.global())
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

有趣的事实是,当我在自定义发布商中使用第一个列表中相同顺序的运算符时,我收到了我想要的行为:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

所以我认为要么是我对所有这些机制的完全误解,要么是一些发布者故意选择线程来发布值(

Just
Sequence
->
Main
URLSession.DataTaskPublisher
->
Some of Background
),这对我来说没有意义,因为在这种情况下我们为什么需要这个
subscribe(on:options:)

您能帮助我了解我缺少什么吗?预先感谢您。

swift combine frp
2个回答
12
投票

首先要理解的是,消息既沿管道向上流动,又沿管道向下流动。沿管道(“上游”)向上流动的消息有: 订阅实际表现(接收订阅)

  • 订阅者向上游发布者请求新值

  • 取消消息(这些消息从最终订阅者向上渗透)

  • 沿管道(“下游”)

    向下
  • 流动的消息是:

价值观

  • 完成,包括失败(错误)或按顺序完成(报告发布者发出了其最后一个值)

  • 好吧,正如文档明确指出的那样,

    subscribe(on:)
  • 是关于前者的:
向上游流动的消息

。但您实际上并没有在测试中跟踪

那些
消息中的任何,因此您的结果都没有反映有关它们的任何信息!在订阅点上方插入适当的 handleEvents 运算符,以查看内容沿着管道向上流动(例如实现其 receiveRequest: 参数):
Just("Some text")
    .handleEvents(receiveRequest: {
        _ in print("Handle1: \(Thread.isMainThread)")
    })
    .map // etc.
同时,您应该对消息将在其上流向

下游
的线程做出

假设(即值和完成)。你说: 此外,我从不同的文章中得到的是,除非我们明确指定调度程序来接收下游消息(使用receive(on:options:)

),否则消息将在用于接收订阅的调度程序上发送。

但这似乎是一个虚假的假设。您的代码中没有任何内容以明确的方式确定下游发送线程。正如你所说,你
可以

receive(on:)

来控制这件事,但如果你不这样做,我会说你必须对此事不承担任何。某些发布者确实会在后台线程上生成值,例如数据任务发布者,这非常有意义(数据任务完成处理程序也会发生同样的情况)。其他人则没有。

可以假设除

receive(on:)

之外的运算符通常不会更改值传递线程。但是操作员是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的事情。要控制接收线程,就控制它!致电 receive(on:) 或假设什么都不做。

举个例子,如果您将开头更改为
Just("Some text")
    .receive(on: DispatchQueue.main)

然后您的

map
sink

都会报告它们正在主线程上接收值。为什么?因为你控制了接收线程。无论您在任何

subscribe(on:)
命令中说什么,这都有效。它们是完全不同的事情。
也许如果你调用
subscribe(on:)
但不调用

receive(on:)

,有关下游发送线程的一些事情是由

subscribe(on:)
线程决定的,但我肯定不会依赖于任何硬性和快速的规则关于它;文档中没有任何说明!相反,不要这样做。如果您实施
subscribe(on:)
,也实施
receive(on:)
,以便
负责发生的事情。
    
我玩了这个例子并弄清楚了这一切。


0
投票

The result: Handle Events: false Map: true Sink: false

© www.soinside.com 2019 - 2024. All rights reserved.