Observable 会自我递归,但我找不到原因

问题描述 投票:0回答:1

我是

rxjs
库的新手,仍在学习它,我正在尝试使用
Observable
在 Node.js 中实现 TCP 服务器,并使用我在公司看到的包含函数的模式它接受一个
Observable
作为输入并返回另一个输出
Observable
,该输出将被发送回客户端,所以我想出了这个代码

import net from "net";
import * as rx from "rxjs";

type MaybePromise<T> = T | PromiseLike<T>;
type ConnectionHandler = (address: net.AddressInfo, input$: rx.Observable<Buffer>) => MaybePromise<rx.Observable<Buffer>>;

const createServer = (port: number, handler: ConnectionHandler, opts?: net.ServerOpts) => {
  const server = net.createServer(opts);

  new rx.Observable<net.Socket>(subscriber => {
    server
      .on("connection", socket => subscriber.next(socket))
      .on("error", err => subscriber.error(err));
  }).subscribe(async socket =>
    (await handler(
      socket.address() as net.SocketAddress,
      new rx.Observable<Buffer>(subscriber => {
        socket
          .on("end", () => socket.destroySoon())
          .on("error", err => subscriber.error(err))
          .on("data", request => subscriber.next(request))
          .on("close", () => subscriber.complete());
      }),
    )).subscribe({
      next: response => {
        console.log(response.toString());
        socket.emit("data", response);
      },
      error: () => socket.destroySoon(),
    })
  );

  server.listen(port);
};

createServer(3000, async (address, input$) => {
  console.log(`New client: ${address.address}`);

  const login = await rx.firstValueFrom(input$);

  if (login.toString() != "login\n")
    return rx.throwError(() => new Error("login failure"));

  return input$.pipe(
    rx.map(data => {
      return Buffer.from(`ACK: ${data.toString()}`, "utf-8");
    })
  );
});

所以基本上在这个例子中我想等待客户端的

login\n
请求,然后对于每个请求,我想将其回显给客户端,在其前面添加
ACK: 
,但是在创建输出时可观察到它在同一次按摩中添加了数千次
ACK
,所以看起来输出每次都会作为输入返回到函数,但我不知道为什么,我没有更改输入
Observable
,我只是返回一个新的,将被处理发送给客户,你能帮我指出错误来自哪里吗?

node.js typescript rxjs tcpserver
1个回答
0
投票

你们都有:

socket
//[...]
.on("data", request => subscriber.next(request))

并且:

// [...]
.subscribe({
      next: response => {
        // [...]
        socket.emit("data", response);
      },
})

我猜这就是导致无限循环的原因。

您需要将

socket.emit("data", response)
替换为
socket.write(response)

查看

Socket#write
的文档。

© www.soinside.com 2019 - 2024. All rights reserved.