我在服务器端使用 Rust 定义了一个 sse api,如下所示:
use actix_web::{App, HttpResponse, HttpServer, Responder, post, get};
use rust_wheel::common::util::net::sse_stream::SseStream;
use std::time::{Duration, SystemTime};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task,
};
#[get("/sse/producer")]
pub async fn sse() -> impl Responder {
let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
tokio::sync::mpsc::unbounded_channel();
task::spawn(async move {
for _ in 0..5 {
let message = format!("Current time: {:?}", SystemTime::now());
tx.send(message).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let response = HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(SseStream { receiver: Some(rx) });
response
}
#[actix_web::main]
async fn main() -> Result<(), std::io::Error> {
HttpServer::new(|| App::new().service(sse).service(sse))
.bind("127.0.0.1:8000")?
.run()
.await
}
这是
Cargo.toml
依赖项:
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"
rust_wheel = { git = "https://github.com/jiangxiaoqiang/rust_wheel.git", branch = "diesel2.0" }
actix-web = "4"
使用curl来测试api:
curl -N http://localhost:8000/sse/producer
效果很好。然后我使用
"event-source-polyfill": "^1.0.31"
定义客户端来获取 sse 数据:
import React from 'react';
import { EventSourcePolyfill } from 'event-source-polyfill';
const App: React.FC = () => {
React.useEffect(() => {
doSse();
},[]);
const doSse = () => {
let eventSource: EventSourcePolyfill;
eventSource = new EventSourcePolyfill('/sse/producer', {
headers: {
}
});
eventSource.onopen = () => {
}
eventSource.onerror = (error:any) => {
console.log(error)
eventSource.close();
}
eventSource.onmessage = (msg: any) => {
console.log(msg)
};
}
return (
<div>
</div>
);
}
export default App;
什么也没返回。我错过了什么吗?我应该怎么做才能让它发挥作用?这是客户端代理配置:
proxy: {
'/sse': 'http://localhost:8000/'
}
客户端日志显示此消息:
我无法收到任何有用的消息。任何建议表示赞赏。我还尝试将事件源数据更改为标准格式,如下所示:
#[get("/sse/producer")]
pub async fn sse() -> impl Responder {
let (tx, rx): (UnboundedSender<SSEMessage>, UnboundedReceiver<SSEMessage>) =
tokio::sync::mpsc::unbounded_channel();
task::spawn(async move {
for _ in 0..5 {
let message:SSEMessage = SSEMessage::from_data(&"data11".to_string(),&"TEX_LOG".to_string());
tx.send(message).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let response = HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(SseStream { receiver: Some(rx) });
response
}
这就是
SSEMessage
的定义:
#[derive(Deserialize, Serialize)]
pub struct SSEMessage {
pub event: Option<String>,
pub data: String,
pub id: Option<String>,
pub retry: Option<u32>,
}
经过一番深入研究,我发现polyfill使用fetch来执行sse请求,似乎google chrome没有将polyfill视为事件源请求,因此没有显示在devtools的事件流列表中。更多信息:https://github.com/Yaffle/EventSource/issues/79 https://github.com/EventSource/eventsource/issues/94