如何通过用户订阅的 SignalR 传输市场数据

问题描述 投票:0回答:1

I want to stream market data similar to https://min-api.cryptocompare.com/documentation/websockets?key=Channels&cat=Trade&api_key=c1f6792f289d2672ac209b6c2ec2b552b0c1082b3ed27ad4bef19edde0684d07 for a TradingView chart.

我们可以订阅多个流:

  • 贸易:
    0~{exchange}~{base}~{quote}
  • 代码:
    2~{exchange}~{base}~{quote}
  • 24~{exchange or CCCAGG}~{base}~{quote}~{period}
  • 等等

如何使用 SignalR 实现相同的效果?它目前的做法有点相同(我的意思是它有效),但我觉得我做得不对。

public class MarketData : Hub
{
    public async Task SendMessage(object message)
    {
        // Here you handle the received message
        var receivedData = JsonConvert.DeserializeObject<dynamic>(message.ToString());

        if (receivedData.action == "SubAdd")
        {
            // Start streaming data.
            // This is a simple example, replace with your own logic.
            while (true)
            {
                var randomData = GenerateRandomData(); // This method generates random data
                await Clients.Caller.SendAsync("ReceiveMessage", randomData);
                await Task.Delay(1000); // for example, send data every second
            }
        }
    }

    private string GenerateRandomData()
    {
        // Replace this with whatever logic you have to generate or fetch the data
        return "Random Data: " + new Random().Next(1000).ToString();
    }
}

我制作了以下代码片段,可以很好地用于测试目的。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SignalR Client Test</title>

    <!-- Include the SignalR JavaScript client library -->
    <script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@latest/dist/browser/signalr.min.js"></script>
</head>
<body>
<h1>SignalR Client Test</h1>

<button id="connectBtn">Connect</button>
<button id="disconnectBtn" disabled>Disconnect</button>
<h2>Received Messages:</h2>
<ul id="messagesList"></ul>

<h2>Send a Custom Message:</h2>
<textarea id="customMessage" rows="4" cols="50">{"action":"SubAdd","subs":["0~Binance~BTC~USDT"]}</textarea><br>
<button id="sendCustomBtn">Send Custom Message</button>

<script>
    let connection = new signalR.HubConnectionBuilder()
        .withUrl("https://localhost:5001/hub")
        .configureLogging(signalR.LogLevel.Information)
        .build();

    const connectBtn = document.getElementById("connectBtn");
    const disconnectBtn = document.getElementById("disconnectBtn");
    const messagesList = document.getElementById("messagesList");
    const sendCustomBtn = document.getElementById("sendCustomBtn");

    connectBtn.addEventListener("click", function () {
        connection
            .start()
            .then(function () {
                console.log("connected");
                connectBtn.disabled = true;
                disconnectBtn.disabled = false;
            })
            .catch((err) => console.error(err.toString()));
    });

    disconnectBtn.addEventListener("click", function () {
        connection.stop().then(function () {
            connectBtn.disabled = false;
            disconnectBtn.disabled = true;
        });
    });

    sendCustomBtn.addEventListener("click", function() {
        const customMessageText = document.getElementById("customMessage").value;
        let customMessage;
        try {
            customMessage = JSON.parse(customMessageText);
            connection.invoke("SendMessage", customMessage).catch(err => console.error(err.toString()));
        } catch (error) {
            console.error("Invalid JSON message:", error);
            alert("Please enter a valid JSON message.");
        }
    });

    connection.on("ReceiveMessage", function (message) {
        let li = document.createElement("li");
        li.textContent = message;
        messagesList.appendChild(li);
    });
</script>
</body>
</html>

我还认为 Microsoft Orleansgrain 在这种情况下会是完美的,因为如果有多个用户尝试订阅同一个交易所/符号,我们可以重用该grain。并且负载将均匀地分布在节点上。

c# asp.net-core signalr tradingview-api orleans
1个回答
0
投票

您可以使用计时器定期发送数据更新,而不是无限循环。这种方法可确保定期发送数据,而不会阻塞线程。

我已经为您创建了一个示例,您可以查看。

我的测试结果

我的市场数据.cshtml

@{
    ViewData["Title"] = "MarketData";
}
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SignalR Client Test</title>
    <script src="~/js/signalr/dist/browser/signalr.min.js"></script>
</head>
<body>
    <h1>SignalR MarketData</h1>

    <button id="connectBtn">Connect</button>
    <button id="disconnectBtn" disabled>Disconnect</button>
    <button id="subscribeBtn" disabled>Subscribe</button>
    <button id="unsubscribeBtn" disabled>Unsubscribe</button>
    <h2>Received Messages:</h2>
    <ul id="messagesList"></ul>

    <h2>Subscribe to a Stream:</h2>
    <input type="text" id="streamInput" placeholder="Enter stream ID (e.g., 'Binance~BTC~USDT')" />
    <button id="subscribeToStreamBtn">Subscribe to Stream</button>

    <h2>Unsubscribe from Stream:</h2>
    <button id="unsubscribeFromStreamBtn">Unsubscribe from Stream</button>

    <script>
        let connection = new signalR.HubConnectionBuilder()
            .withUrl("https://localhost:44310/mainhub")
            .configureLogging(signalR.LogLevel.Information)
            .build();

        const connectBtn = document.getElementById("connectBtn");
        const disconnectBtn = document.getElementById("disconnectBtn");
        const subscribeBtn = document.getElementById("subscribeBtn");
        const unsubscribeBtn = document.getElementById("unsubscribeBtn");
        const messagesList = document.getElementById("messagesList");
        const streamInput = document.getElementById("streamInput");
        const subscribeToStreamBtn = document.getElementById("subscribeToStreamBtn");
        const unsubscribeFromStreamBtn = document.getElementById("unsubscribeFromStreamBtn");

        connectBtn.addEventListener("click", function () {
            connection
                .start()
                .then(function () {
                    console.log("connected");
                    connectBtn.disabled = true;
                    disconnectBtn.disabled = false;
                    subscribeBtn.disabled = false;
                })
                .catch((err) => console.error(err.toString()));
        });

        disconnectBtn.addEventListener("click", function () {
            connection.stop().then(function () {
                connectBtn.disabled = false;
                disconnectBtn.disabled = true;
                subscribeBtn.disabled = true;
                unsubscribeBtn.disabled = true;
            });
        });

        subscribeBtn.addEventListener("click", function () {
            connection.invoke("SubscribeToStream", streamInput.value)
                .then(function () {
                    subscribeBtn.disabled = true;
                    unsubscribeBtn.disabled = false;
                })
                .catch(err => console.error(err.toString()));
        });

        unsubscribeBtn.addEventListener("click", function () {
            connection.invoke("UnsubscribeFromStream")
                .then(function () {
                    subscribeBtn.disabled = false;
                    unsubscribeBtn.disabled = true;
                })
                .catch(err => console.error(err.toString()));
        });

        subscribeToStreamBtn.addEventListener("click", function () {
            const streamId = streamInput.value;
            connection.invoke("SubscribeToStream", streamId)
                .then(function () {
                    console.log("Subscribed to stream:", streamId);
                    subscribeToStreamBtn.disabled = true;
                    unsubscribeFromStreamBtn.disabled = false;
                })
                .catch(err => console.error(err.toString()));
        });

        unsubscribeFromStreamBtn.addEventListener("click", function () {
            connection.invoke("UnsubscribeFromStream")
                .then(function () {
                    console.log("Unsubscribed from the stream");
                    subscribeToStreamBtn.disabled = false;
                    unsubscribeFromStreamBtn.disabled = true;
                })
                .catch(err => console.error(err.toString()));
        });

        connection.on("ReceiveStreamData", function (data) {
            let li = document.createElement("li");
            li.textContent = data;
            messagesList.appendChild(li);
        });
    </script>
</body>
</html>

我的中心课程

using Microsoft.AspNetCore.SignalR;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SignalRMiddleawre.Hubs
{
    public partial class MainHub : Hub
    {
        private readonly Dictionary<string, Timer> _subscriptionTimers = new Dictionary<string, Timer>();
        private readonly object _lock = new object();

        public override Task OnDisconnectedAsync(Exception exception)
        {
            // Cleanup resources when a user disconnects
            string userId = Context.ConnectionId;
            if (_subscriptionTimers.TryGetValue(userId, out var timer))
            {
                timer.Dispose();
                _subscriptionTimers.Remove(userId);
            }

            return base.OnDisconnectedAsync(exception);
        }

        public async Task SubscribeToStream(string streamId)
        {
            // Start streaming data for the specified streamId
            string userId = Context.ConnectionId;

            // Stop any existing timer for this user
            lock (_lock)
            {
                if (_subscriptionTimers.TryGetValue(userId, out var existingTimer))
                {
                    existingTimer.Dispose();
                    _subscriptionTimers.Remove(userId);
                }
            }

            var newTimer = new Timer(async _ =>
            {
                try
                {
                    // Replace this with logic to fetch actual data from a reliable source
                    var data = await FetchMarketData(streamId);

                    // Send the data update to the user
                    await Clients.Caller.SendAsync("ReceiveStreamData", data);
                }
                catch (Exception ex)
                {
                    // Handle errors gracefully
                    Console.WriteLine("Error fetching market data: " + ex.Message);
                }
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(5)); // Adjust the interval as needed

            lock (_lock)
            {
                _subscriptionTimers[userId] = newTimer;
            }
        }

        public async Task UnsubscribeFromStream()
        {
            // Stop the subscription for the current user
            string userId = Context.ConnectionId;
            if (_subscriptionTimers.TryGetValue(userId, out var timer))
            {
                timer.Dispose();
                _subscriptionTimers.Remove(userId);
            }

            await Task.CompletedTask;
        }

        private async Task<string> FetchMarketData(string streamId)
        {
            // Replace this with logic to fetch actual market data
            // from a reliable source based on the provided streamId
            await Task.Delay(100); // Simulate data fetch
            return "Market Data for " + streamId + ": " + new Random().Next(1000).ToString();
        }


    }
}
© www.soinside.com 2019 - 2024. All rights reserved.