快速合并具有多个值的Future?

问题描述 投票:-1回答:1

我可能会采用错误的方式,但是我有一个函数,随着时间的推移,我想使用该函数发出多个值。但是我不希望它开始发光,直到某个对象被订阅。我将从RxSwift进行合并,因此我基本上是想在RxSwift世界中复制Observable.create()。我发现最接近的是返回期货,但是期货只会成功或失败(因此它们基本上就像RxSwift中的Single。)

我在这里缺少一些基本的东西吗?我的最终目标是创建一个函数,该函数处理视频文件并发出进度事件,直到完成为止,然后为完成的文件发出URL。

swift rx-swift combine
1个回答
1
投票

通常,您可以使用PassthroughSubject发布自定义输出。您可以在自己的PassthroughSubject实现中包装一个PassthroughSubject(或多个Publisher),以确保只有您的进程才能通过该主题发送事件。

为了示例目的,让我们模拟VideoFrame类型和一些输入帧:

typealias VideoFrame = String
let inputFrames: [VideoFrame] = ["a", "b", "c"]

现在,我们要编写一个函数来同步处理这些帧。我们的函数应该以某种方式报告进度,最后,它应该返回输出帧。要报告进度,我们的函数将获取一个PassthroughSubject<Double, Never>,并将其进度(从0到1的分数)发送给主题:

func process(_ inputFrames: [VideoFrame], progress: PassthroughSubject<Double, Never>) -> [VideoFrame] {
    var outputFrames: [VideoFrame] = []
    for input in inputFrames {
        progress.send(Double(outputFrames.count) / Double(inputFrames.count))
        outputFrames.append("output for \(input)")
    }
    return outputFrames
}

好的,现在我们要把它变成出版商。发布者需要输出进度和最终结果。因此,我们将使用此enum作为其输出:

public enum ProgressEvent<Value> {
    case progress(Double)
    case done(Value)
}

现在我们可以定义我们的Publisher类型。我们称它为SyncPublisher,因为当它收到Subscriber时,它立即(同步)执行其整个计算。

public struct SyncPublisher<Value>: Publisher {
    public init(_ run: @escaping (PassthroughSubject<Double, Never>) throws -> Value) {
        self.run = run
    }

    public var run: (PassthroughSubject<Double, Never>) throws -> Value

    public typealias Output = ProgressEvent<Value>
    public typealias Failure = Error

    public func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let progressSubject = PassthroughSubject<Double, Never>()
        let doneSubject = PassthroughSubject<ProgressEvent<Value>, Error>()
        progressSubject
            .setFailureType(to: Error.self)
            .map { ProgressEvent<Value>.progress($0) }
            .append(doneSubject)
            .subscribe(subscriber)
        do {
            let value = try run(progressSubject)
            progressSubject.send(completion: .finished)
            doneSubject.send(.done(value))
            doneSubject.send(completion: .finished)
        } catch {
            progressSubject.send(completion: .finished)
            doneSubject.send(completion: .failure(error))
        }
    }
}

现在我们可以将process(_:progress:)函数变成这样的SyncPublisher

let inputFrames: [VideoFrame] = ["a", "b", "c"]
let pub = SyncPublisher<[VideoFrame]> { process(inputFrames, progress: $0) }

run闭包为{ process(inputFrames, progress: $0) }。请记住,$0是一个PassthroughSubject<Double, Never>,正好是process(_:progress:)作为其第二个参数。

[当我们订阅此pub时,它将首先创建两个主题。一个主题是进度主题,并传递给关闭。我们将使用另一个主题来发布最终结果和.finished完成,或者如果.failure闭包引发错误,则仅发布run完成。

之所以使用两个单独的主题,是因为它确保我们的发布者行为规范。如果run关闭正常返回,发布者将发布零个或多个进度报告,然后发布单个结果,然后发布.finished。如果run闭包引发错误,则发布者将发布零个或多个进度报告,然后发布.failedrun闭包无法使发布者发出多个结果,或在发出结果后发出更多进度报告。

最后,我们可以订阅pub以查看其是否正常运行:

pub
    .sink(
        receiveCompletion: { print("completion: \($0)") },
        receiveValue: { print("output: \($0)") })

这是输出:

output: progress(0.0)
output: progress(0.3333333333333333)
output: progress(0.6666666666666666)
output: done(["output for a", "output for b", "output for c"])
completion: finished
© www.soinside.com 2019 - 2024. All rights reserved.