如何在 Vapor Swift 中传输数据?

问题描述 投票:0回答:1

我正在使用 Vapor Swift 构建一个开放式 AI 聊天流后端。它使用 MacPaw 的 OpenAI 包装器连接到 Open AI API。但我不确定如何使用 SSE 将结果流式传输给客户端,而不是作为单个响应。

我当前的代码如下所示:

    func postChatStreamHandler(_ req: Request) throws -> EventLoopFuture<Response> {
        let openAI = OpenAI(configuration: configuration)
        let promise = req.eventLoop.makePromise(of: Data.self)
        let query = ChatQuery(model: .gpt4, messages: messages)
        openAI.chatsStream(query: query) { partialResult in
            switch partialResult {
            case .success(let result):
                if let detla = result.choices.first?.delta,
                   let data = try? JSONEncoder().encode(result) {
                    promise.succeed(data)
                }
            case .failure(let error):
                ...
            }
        } completion: { error in
            ...
        }
        return promise.futureResult.map { data in
            let response = Response()
            response.body = .init(buffer: ByteBuffer(data: data))
            return response
        }
    }
swift server-sent-events openai-api vapor
1个回答
0
投票
struct GenericAIController: RouteCollection {
    private let service = AIService()
    
    func boot(routes: RoutesBuilder) throws {
        let routes = routes.grouped("api", "ai")
        routes.get("completion", "stream", use: generatePoemStream(req:))
    }
    
    func generatePoemStream(req: Request) async throws -> AsyncThrowingStream<String, Error> {
        let query = ChatQuery(
            model: .gpt4_1106_preview,
            messages: [
                .init(role: .user, content: "Write a poem")
            ]
        )
        
        let originalStream = service.chatsStream(query: query)
        
        return AsyncThrowingStream<String, Error> { continuation in
            Task {
                for try await element in originalStream {
                    if let stringElement = element.choices.first?.delta.content {
                        continuation.yield(stringElement)
                    }
                }

                continuation.finish(throwing: nil)
            }
        }
    }

}

extension AsyncThrowingStream: AsyncResponseEncodable where Element: Encodable {
    public func encodeResponse(for request: Request) async throws -> Response {
        let response = Response(status: .ok)
        let body = Response.Body(stream: { writer in
            Task {
                do {
                    for try await element in self {
                        let data = try JSONEncoder().encode(element)
                        _ = writer.write(.buffer(.init(data: data)))
                    }
                    
                    _ = writer.write(.end)
                } catch {
                    // Handle errors as needed
                }
            }
        })

        response.body = body
        return response
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.