重新连接websocket rxjs

问题描述 投票:0回答:2

我试图让我的 websocket 代码自动尝试重新连接(无限期地)直到成功。通过每 x 秒发送一条“ping”消息,我可以检测管道何时损坏,并调用

closeObserver

但是,我不确定如何启动重新连接序列。


const notificationConnectionEpic: Epic<ActionTypes, any, RootState> = (
  action$,
  state$
) =>
  action$.pipe(
    filter(isActionOf(actions.connectNotificationPipeline.request)),
    switchMap(async action => {
      const resp = await requireValidToken(action$, state$, params =>
        AdminHubs.getHubNotificationsToken({
          ...params,
          id: action.payload.hubId
        })
      );

      return resp.pipe(
        switchMap(v => {
          if (isAction(v)) {
            return of(v);
          }
          if (!v.ok) {
            return of(
              actions.connectNotificationPipeline.failure({
                hubId: action.payload.hubId,
                error: v.error
              })
            );
          }

          const webSocketOpen$ = new Subject();
          const webSocketClose$ = new Subject();
          const webSocket$ = webSocket<AdminHubs.HubNotification>({
            url: v.value,
            openObserver: webSocketOpen$,
            closeObserver: webSocketClose$
          });

          const message$ = webSocket$.pipe(
            map(message => actions.receiveNotification({ message })),
            takeUntil(action$.ofType(HubActionConsts.NOTIFICATION_PIPE_CLOSED))
          );

          const ping$ = interval(1000).pipe(
            map(_ => webSocket$.next("ping" as any)),
            ignoreElements()
          );

          const open$ = webSocketOpen$.pipe(
            take(1),
            map(_ =>
              actions.connectNotificationPipeline.success({
                hubId: action.payload.hubId
              })
            )
          );

          const close$ = webSocketClose$.pipe(
            // called when a network drop happens. handle reconnect?
          ); // also happens on net error
          return merge(message$, open$, ping$, close$);
        })
      );
    }),
    mergeMap(v => v)
  );
websocket rxjs redux-observable
2个回答
1
投票

当 WebSocket 连接关闭时,只需再次发送

actions.connectNotificationPipeline.request
即可。这将重新运行此代码并创建一个新的 WebSocket 连接。


0
投票

过去几周我一直在努力解决这个问题,并决定创建一个名为 SuperSocket 的包——以防它对任何人有帮助!它应该可以替代您正在使用的 WebSocket 观察者实现。

SuperSocket 位于现有 WebSocket 实现之上,除其他功能外,还确保它能够重新连接,直到成功为止。当然,您可以设置最大重试次数以避免无限循环和不必要的 CPU 负载:)

© www.soinside.com 2019 - 2024. All rights reserved.