使用 sse 我需要将 `BroadcastStream<String>` 转换为 `BroadcastStream<sse::Event>`。可以吗?

问题描述 投票:0回答:1

使用下面的代码我不明白如何转换:

mystream: BroadcastStream<String>

mystream: BroadcastStream<sse::Event>

REPL:https://www.rustexplorer.com/b/8kla6t

你能帮我理解吗?

如您所见,问题出在 func

sse_handler_as_string()

struct AppState {
    tx: broadcast::Sender<Event>,
    tx_as_string: broadcast::Sender<String>,
}

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<Event>(100);

    let (tx_as_string, _) = broadcast::channel::<String>(100);

    let app_state = Arc::new(AppState { tx, tx_as_string });

    let app = Router::new()
        .route("/send_message", get(send_message))
        .route("/sse", get(sse_handler))
        .route("/send_message_as_string", get(send_message_as_string))
        .route("/sse_as_string", get(sse_handler_as_string))
        .with_state(app_state);

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn send_message(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
    app_state
        .tx
        .send(Event::default().data("custom_data"))
        .expect("send message");

    Html("<h1>Hello, World!</h1>")
}

async fn sse_handler(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
    let rx = app_state.tx.subscribe();

    let mystream = BroadcastStream::new(rx);

    Sse::new(mystream).keep_alive(KeepAlive::default())
}

async fn send_message_as_string(State(app_state): State<Arc<AppState>>) -> Html<&'static str> {
    app_state
        .tx_as_string
        .send("custom_data".to_string())
        .expect("send message");

    Html("<h1>Hello, World!</h1>")
}

async fn sse_handler_as_string(
    State(app_state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<String, BroadcastStreamRecvError>>> {
    let rx = app_state.tx_as_string.subscribe();

    let mystream = BroadcastStream::new(rx);

    // HOW TO TRANSFORM mystream<String> TO axum::response::sse::Event here???

    Sse::new(mystream).keep_alive(KeepAlive::default())
}
rust server-sent-events rust-axum
1个回答
0
投票

我当然可以使用

mystream.map()

© www.soinside.com 2019 - 2024. All rights reserved.