我是
rxjs
库的新手,仍在学习它,我正在尝试使用 Observable
在 Node.js 中实现 TCP 服务器,并使用我在公司看到的包含函数的模式它接受一个 Observable
作为输入并返回另一个输出 Observable
,该输出将被发送回客户端,所以我想出了这个代码
import net from "net";
import * as rx from "rxjs";
type MaybePromise<T> = T | PromiseLike<T>;
type ConnectionHandler = (address: net.AddressInfo, input$: rx.Observable<Buffer>) => MaybePromise<rx.Observable<Buffer>>;
const createServer = (port: number, handler: ConnectionHandler, opts?: net.ServerOpts) => {
const server = net.createServer(opts);
new rx.Observable<net.Socket>(subscriber => {
server
.on("connection", socket => subscriber.next(socket))
.on("error", err => subscriber.error(err));
}).subscribe(async socket =>
(await handler(
socket.address() as net.SocketAddress,
new rx.Observable<Buffer>(subscriber => {
socket
.on("end", () => socket.destroySoon())
.on("error", err => subscriber.error(err))
.on("data", request => subscriber.next(request))
.on("close", () => subscriber.complete());
}),
)).subscribe({
next: response => {
console.log(response.toString());
socket.emit("data", response);
},
error: () => socket.destroySoon(),
})
);
server.listen(port);
};
createServer(3000, async (address, input$) => {
console.log(`New client: ${address.address}`);
const login = await rx.firstValueFrom(input$);
if (login.toString() != "login\n")
return rx.throwError(() => new Error("login failure"));
return input$.pipe(
rx.map(data => {
return Buffer.from(`ACK: ${data.toString()}`, "utf-8");
})
);
});
所以基本上在这个例子中我想等待客户端的
login\n
请求,然后对于每个请求,我想将其回显给客户端,在其前面添加 ACK:
,但是在创建输出时可观察到它在同一次按摩中添加了数千次ACK
,所以看起来输出每次都会作为输入返回到函数,但我不知道为什么,我没有更改输入Observable
,我只是返回一个新的,将被处理发送给客户,你能帮我指出错误来自哪里吗?
你们都有:
socket
//[...]
.on("data", request => subscriber.next(request))
并且:
// [...]
.subscribe({
next: response => {
// [...]
socket.emit("data", response);
},
})
我猜这就是导致无限循环的原因。
您需要将
socket.emit("data", response)
替换为 socket.write(response)
。
Socket#write
的文档。