如何向现有客户端(任何通用消息)提供 Rails websocket?

问题描述 投票:0回答:0

在尝试了不同的方法以使其正常工作之后,我还没有设法理解 Rails 的神奇价值以提供 websocket 接口。已知客户端代码可以工作,因为它已经使用 NodeJS 和 FastAPI 接口进行了测试。据我了解,Rails 服务器读取/响应 WS 事件的正确方法是通过最后一个代码片段中实现的“send_msg”方法,但是应该如何调用该方法?似乎为了调用 send_msg 方法,我必须修改客户端代码以使用 Rails 提供的 JS 库(如here),这是不可能的。

正如标题所说,问题是如何创建一个简单的(但通用的?)WS 消息接收器/广播器?

websocket 接口应该如何工作

  • 在 /clock 有一个 ws: 端点
  • 客户端可以连接到/clock
  • 当客户端向 /clock 发送带有数据 'requestTime' 的 WS 消息时,API 会向所有连接的客户端广播服务器系统时间
  • 客户代码不可更改

客户端如何尝试连接和请求时间(NodeJS) (连接 X 个客户端,广播时间 Y 次)

import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";

// client that only counts the received messages
function wsFunction(requestCount) {
    return resolve => {
        let limit = requestCount;
        // construct all promises
        const client = new ws("ws://localhost:3000/clock");

        client.on('message', data => {
            if(--limit == 0) {
                client.close();
            }
        });

        client.on('close', () => {
            if(limit > 0) {
                console.log("Socket closed prematurely... ", limit);
            }
        });

        client.on('open', () => {
            resolve(client); // client connected
        });

        const close = () => {
            if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
                client.close();
            }
        }
    }
}

/**
 * 
 * @param {*} limit 
 * @returns operation time for limit messages, or -1 if connection is cut
 */
function attemptClockFetches(clientCount, retrieveCount) {
    const clients = [];
    for(let i = 0; i < clientCount - 1; ++i) {
        clients.push(async () => new Promise(wsFunction(retrieveCount)));
    }

    // client that initiates the broadcast
    const promise = new Promise(async resolve => {
        const startTime = performance.now();
        const sockets = await async.parallel(clients); // connect all clients

        // create updater client
        const client = new ws("ws://localhost:3000/clock");

        // now update until limit is reached
        client.on('close', () => {
            if(retrieveCount > 0) {
                console.log("Parent socket closed prematurely...");
            }
        });

        client.on('message', () => {
            if(--retrieveCount > 0) {
                client.send("requestTime");
            } else {
                client.close();
                const endTime = performance.now();
                // close all sockets
                for(let s of sockets) {
                    s.close();
                }
                resolve(endTime - startTime);
            }
        });

        client.on('open', () => {
            client.send("requestTime");
        });
    });

    return promise;
}

async function doStressTest() {
    await attemptClockFetches(10, 10);
}

const i = setInterval(() => {
    // prevent node from killing process
}, 1000);

doStressTest().then(() => {
    clearInterval(i);
});

工作的 NodeJS WebSocket 响应器的片段,本质上这是需要在 Rails 中复制的内容

const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });

wsServer.on('connection', socket => {
    socket.on('error', err => {
        console.error(err);
    });

    socket.on('message', data => {
        if(data.toString() === "requestTime") {
            // broadcast time on requestTime event to all clients
            wsServer.clients.forEach(client => {
                if(client.readyState === ws.OPEN) {
                    client.send((new Date()).getMilliseconds());
                }
            });
        }
    });
});

我目前实现了什么 我已将其添加到 routes.rb,假设它将所有 WS 事件定向到路径 /clock,即 ClocksChannel

Rails.application.routes.draw do
  get '/users/:userId/cards', to: 'card#index'
  # get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL

  mount ActionCable.server => '/clock'
end

实现这个频道,假设它订阅和取消订阅客户。至于调用send_msg,我不太清楚应该怎么调用

require "time"

class ClocksChannel < ApplicationCable::Channel
  def subscribed
    # stream_from "some_channel"
  end

  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end

  def send_msg(data)
    if data == "requestTime"
          ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
    end
  end
end

主卡_controller.rb的内容

class CardController < ApplicationController
    def index
        # do some index things, not part of WS
    end

    # def clock
    #     render "Hello World"
    # end
end

当服务器接收到具有给定设置的连接时,将给出以下输出:

Started GET "/clock" for 127.0.0.1 at 2023-03-09 18:44:37 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 18:44:37 +0200
Request origin not allowed:
Failed to upgrade to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)        
Finished "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 18:44:37 +0200

客户得到这个回应

Error: Unexpected server response: 404
ruby-on-rails ruby websocket ruby-on-rails-5
© www.soinside.com 2019 - 2024. All rights reserved.