I want to stream market data similar to https://min-api.cryptocompare.com/documentation/websockets?key=Channels&cat=Trade&api_key=c1f6792f289d2672ac209b6c2ec2b552b0c1082b3ed27ad4bef19edde0684d07 for a TradingView chart.
我们可以订阅多个流:
0~{exchange}~{base}~{quote}
2~{exchange}~{base}~{quote}
24~{exchange or CCCAGG}~{base}~{quote}~{period}
如何使用 SignalR 实现相同的效果?它目前的做法有点相同(我的意思是它有效),但我觉得我做得不对。
public class MarketData : Hub
{
public async Task SendMessage(object message)
{
// Here you handle the received message
var receivedData = JsonConvert.DeserializeObject<dynamic>(message.ToString());
if (receivedData.action == "SubAdd")
{
// Start streaming data.
// This is a simple example, replace with your own logic.
while (true)
{
var randomData = GenerateRandomData(); // This method generates random data
await Clients.Caller.SendAsync("ReceiveMessage", randomData);
await Task.Delay(1000); // for example, send data every second
}
}
}
private string GenerateRandomData()
{
// Replace this with whatever logic you have to generate or fetch the data
return "Random Data: " + new Random().Next(1000).ToString();
}
}
我制作了以下代码片段,可以很好地用于测试目的。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SignalR Client Test</title>
<!-- Include the SignalR JavaScript client library -->
<script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@latest/dist/browser/signalr.min.js"></script>
</head>
<body>
<h1>SignalR Client Test</h1>
<button id="connectBtn">Connect</button>
<button id="disconnectBtn" disabled>Disconnect</button>
<h2>Received Messages:</h2>
<ul id="messagesList"></ul>
<h2>Send a Custom Message:</h2>
<textarea id="customMessage" rows="4" cols="50">{"action":"SubAdd","subs":["0~Binance~BTC~USDT"]}</textarea><br>
<button id="sendCustomBtn">Send Custom Message</button>
<script>
let connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hub")
.configureLogging(signalR.LogLevel.Information)
.build();
const connectBtn = document.getElementById("connectBtn");
const disconnectBtn = document.getElementById("disconnectBtn");
const messagesList = document.getElementById("messagesList");
const sendCustomBtn = document.getElementById("sendCustomBtn");
connectBtn.addEventListener("click", function () {
connection
.start()
.then(function () {
console.log("connected");
connectBtn.disabled = true;
disconnectBtn.disabled = false;
})
.catch((err) => console.error(err.toString()));
});
disconnectBtn.addEventListener("click", function () {
connection.stop().then(function () {
connectBtn.disabled = false;
disconnectBtn.disabled = true;
});
});
sendCustomBtn.addEventListener("click", function() {
const customMessageText = document.getElementById("customMessage").value;
let customMessage;
try {
customMessage = JSON.parse(customMessageText);
connection.invoke("SendMessage", customMessage).catch(err => console.error(err.toString()));
} catch (error) {
console.error("Invalid JSON message:", error);
alert("Please enter a valid JSON message.");
}
});
connection.on("ReceiveMessage", function (message) {
let li = document.createElement("li");
li.textContent = message;
messagesList.appendChild(li);
});
</script>
</body>
</html>
我还认为 Microsoft Orleansgrain 在这种情况下会是完美的,因为如果有多个用户尝试订阅同一个交易所/符号,我们可以重用该grain。并且负载将均匀地分布在节点上。
您可以使用计时器定期发送数据更新,而不是无限循环。这种方法可确保定期发送数据,而不会阻塞线程。
我已经为您创建了一个示例,您可以查看。
我的测试结果
我的市场数据.cshtml
@{
ViewData["Title"] = "MarketData";
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SignalR Client Test</title>
<script src="~/js/signalr/dist/browser/signalr.min.js"></script>
</head>
<body>
<h1>SignalR MarketData</h1>
<button id="connectBtn">Connect</button>
<button id="disconnectBtn" disabled>Disconnect</button>
<button id="subscribeBtn" disabled>Subscribe</button>
<button id="unsubscribeBtn" disabled>Unsubscribe</button>
<h2>Received Messages:</h2>
<ul id="messagesList"></ul>
<h2>Subscribe to a Stream:</h2>
<input type="text" id="streamInput" placeholder="Enter stream ID (e.g., 'Binance~BTC~USDT')" />
<button id="subscribeToStreamBtn">Subscribe to Stream</button>
<h2>Unsubscribe from Stream:</h2>
<button id="unsubscribeFromStreamBtn">Unsubscribe from Stream</button>
<script>
let connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:44310/mainhub")
.configureLogging(signalR.LogLevel.Information)
.build();
const connectBtn = document.getElementById("connectBtn");
const disconnectBtn = document.getElementById("disconnectBtn");
const subscribeBtn = document.getElementById("subscribeBtn");
const unsubscribeBtn = document.getElementById("unsubscribeBtn");
const messagesList = document.getElementById("messagesList");
const streamInput = document.getElementById("streamInput");
const subscribeToStreamBtn = document.getElementById("subscribeToStreamBtn");
const unsubscribeFromStreamBtn = document.getElementById("unsubscribeFromStreamBtn");
connectBtn.addEventListener("click", function () {
connection
.start()
.then(function () {
console.log("connected");
connectBtn.disabled = true;
disconnectBtn.disabled = false;
subscribeBtn.disabled = false;
})
.catch((err) => console.error(err.toString()));
});
disconnectBtn.addEventListener("click", function () {
connection.stop().then(function () {
connectBtn.disabled = false;
disconnectBtn.disabled = true;
subscribeBtn.disabled = true;
unsubscribeBtn.disabled = true;
});
});
subscribeBtn.addEventListener("click", function () {
connection.invoke("SubscribeToStream", streamInput.value)
.then(function () {
subscribeBtn.disabled = true;
unsubscribeBtn.disabled = false;
})
.catch(err => console.error(err.toString()));
});
unsubscribeBtn.addEventListener("click", function () {
connection.invoke("UnsubscribeFromStream")
.then(function () {
subscribeBtn.disabled = false;
unsubscribeBtn.disabled = true;
})
.catch(err => console.error(err.toString()));
});
subscribeToStreamBtn.addEventListener("click", function () {
const streamId = streamInput.value;
connection.invoke("SubscribeToStream", streamId)
.then(function () {
console.log("Subscribed to stream:", streamId);
subscribeToStreamBtn.disabled = true;
unsubscribeFromStreamBtn.disabled = false;
})
.catch(err => console.error(err.toString()));
});
unsubscribeFromStreamBtn.addEventListener("click", function () {
connection.invoke("UnsubscribeFromStream")
.then(function () {
console.log("Unsubscribed from the stream");
subscribeToStreamBtn.disabled = false;
unsubscribeFromStreamBtn.disabled = true;
})
.catch(err => console.error(err.toString()));
});
connection.on("ReceiveStreamData", function (data) {
let li = document.createElement("li");
li.textContent = data;
messagesList.appendChild(li);
});
</script>
</body>
</html>
我的中心课程
using Microsoft.AspNetCore.SignalR;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SignalRMiddleawre.Hubs
{
public partial class MainHub : Hub
{
private readonly Dictionary<string, Timer> _subscriptionTimers = new Dictionary<string, Timer>();
private readonly object _lock = new object();
public override Task OnDisconnectedAsync(Exception exception)
{
// Cleanup resources when a user disconnects
string userId = Context.ConnectionId;
if (_subscriptionTimers.TryGetValue(userId, out var timer))
{
timer.Dispose();
_subscriptionTimers.Remove(userId);
}
return base.OnDisconnectedAsync(exception);
}
public async Task SubscribeToStream(string streamId)
{
// Start streaming data for the specified streamId
string userId = Context.ConnectionId;
// Stop any existing timer for this user
lock (_lock)
{
if (_subscriptionTimers.TryGetValue(userId, out var existingTimer))
{
existingTimer.Dispose();
_subscriptionTimers.Remove(userId);
}
}
var newTimer = new Timer(async _ =>
{
try
{
// Replace this with logic to fetch actual data from a reliable source
var data = await FetchMarketData(streamId);
// Send the data update to the user
await Clients.Caller.SendAsync("ReceiveStreamData", data);
}
catch (Exception ex)
{
// Handle errors gracefully
Console.WriteLine("Error fetching market data: " + ex.Message);
}
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5)); // Adjust the interval as needed
lock (_lock)
{
_subscriptionTimers[userId] = newTimer;
}
}
public async Task UnsubscribeFromStream()
{
// Stop the subscription for the current user
string userId = Context.ConnectionId;
if (_subscriptionTimers.TryGetValue(userId, out var timer))
{
timer.Dispose();
_subscriptionTimers.Remove(userId);
}
await Task.CompletedTask;
}
private async Task<string> FetchMarketData(string streamId)
{
// Replace this with logic to fetch actual market data
// from a reliable source based on the provided streamId
await Task.Delay(100); // Simulate data fetch
return "Market Data for " + streamId + ": " + new Random().Next(1000).ToString();
}
}
}