我可以接受来自本地网络上连接设备的 UDP 消息。我需要将该数据发送到连接的 websocket 客户端。当收到新数据时,需要将其发送到 websocket 客户端进行实时更新。我在新线程中监听 UDP 数据。收到数据后,我不知道如何将该数据传递给 websocket 客户端。所有客户端都将收到相同的数据。在任何给定时间连接的 Websocket 客户端都不会超过 2 或 3 个。 UDP 数据在本地网络上发送到指定端口。目标是让 UDP 数据实时显示在网页上。
即使没有连接websocket客户端,也需要接收UDP数据。 UDP 数据在收到时也会放入数据库中。
class MainClass
{
static void Main(string[] args)
{
Console.WriteLine("Receiver v1.0.0");
ExternalVariables Ev = new ExternalVariables();
Ev.Get();
UdpReceiver r = new UdpReceiver();
r.Port = Ev.ReceivingPort;
Thread ControllerThread = new Thread(r.ReceiveMessages);
ControllerThread.Start();
TcpListener Server = new TcpListener(IPAddress.Parse(Ev.LocalHost), Ev.WebsocketPort);
Server.Start();
while (true)
{
TcpClient Client = Server.AcceptTcpClient();
Console.WriteLine("Websocket Connected...");
//Thread WebsocketThread = new Thread(WebsocketClientHandler);
//WebsocketThread.Start(Client);
WebsocketClientHandler(Client);
}
}
public static void WebsocketClientHandler(object c)
{
byte[] Bytes;
TcpClient Client = (TcpClient)c;
NetworkStream Stream = Client.GetStream();
bool connected = true;
try
{
Bytes = new byte[Client.Available];
Stream.Read(Bytes, 0, Bytes.Length);
string s = Encoding.UTF8.GetString(Bytes);
if (Regex.IsMatch(s, "^GET", RegexOptions.IgnoreCase))
{
Console.WriteLine("=====Handshaking from client=====\n{0}", s);
// 1. Obtain the value of the "Sec-WebSocket-Key" request header without any leading or trailing whitespace
// 2. Concatenate it with "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (a special GUID specified by RFC 6455)
// 3. Compute SHA-1 and Base64 hash of the new value
// 4. Write the hash back as the value of "Sec-WebSocket-Accept" response header in an HTTP response
string swk = Regex.Match(s, "Sec-WebSocket-Key: (.*)").Groups[1].Value.Trim();
string swka = swk + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
byte[] swkaSha1 = System.Security.Cryptography.SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(swka));
string swkaSha1Base64 = Convert.ToBase64String(swkaSha1);
// HTTP/1.1 defines the sequence CR LF as the end-of-line marker
byte[] response = Encoding.UTF8.GetBytes(
"HTTP/1.1 101 Switching Protocols\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
"Sec-WebSocket-Accept: " + swkaSha1Base64 + "\r\n\r\n");
Stream.Write(response, 0, response.Length);
}
//Not sure How to send to connected websocket clients.
while (NewData)
{
//Send to webclient...
}
}
catch (Exception e)
{
connected = false;
Console.WriteLine(e);
Console.WriteLine(e.StackTrace);
}
}
}
public class UdpReceiver
{
public bool MessageReceived = false;
public int Port { get; set; }
public UdpReceiver()
{
Port = 0;
}
public void ReceiveMessages()
{
IPEndPoint e = new IPEndPoint(IPAddress.Any, Port);
UdpClient u = new UdpClient(e);
while (true)
{
MessageReceived = false;
byte[] Data = u.Receive(ref e);
MessageReceived = true;
//Task.Run(() => ProcessData(Data, c)); //New task to process the data in case another controller is sending data
ProcessData(Data);
}
}
public void ProcessData(byte[] Data)
{
ReceivedData ReceivedData = new ReceivedData();
ReceivedData.Data = Data;
ReceivedData.Process();
//Process() puts the data in a database and is supposed to send to websocket clients.
}
}
Websocket 测试客户端......
<%@ Page Language="C#" Inherits="WebsocketsExampleWeb.Default" %>
<!doctype html>
<html lang="en">
<style>
textarea {
vertical-align: bottom;
}
#output {
overflow: auto;
}
#output > p {
overflow-wrap: break-word;
}
#output span {
color: blue;
}
#output span.error {
color: red;
}
</style>
<body>
<h2>WebSocket</h2>
<textarea cols="60" rows="6"></textarea>
<button>send</button>
<div id="output"></div>
</body>
<script>
function connect() {
var ws = new WebSocket('ws://127.0.0.1:8077');
ws.onopen = function() {
writeToScreen("CONNECTED");
};
ws.onmessage = function(e) {
writeToScreen(`<span>RESPONSE: ${e.data}</span>`);
ws.send("Client Response");
console.log('Message:', e.data);
};
ws.onclose = function(e) {
console.log('Socket is closed. Reconnect will be attempted in 1 second.', e.reason);
setTimeout(function() {
connect();
}, 1000);
};
ws.onerror = function(err) {
console.error('Socket encountered error: ', err.message, 'Closing socket');
ws.close();
};
}
function writeToScreen(message) {
output.insertAdjacentHTML("afterbegin", `<p>${message}</p>`);
}
connect();
</script>
</html>
首先,不要尝试自己重新实现 WebSocket 协议。它非常复杂,并且 C# 已经有了一个非常好的
WebSocket
实现。
async
。HttpListener
代替 TcpListener
,并将每个接收到的上下文升级为 Web 套接字。class MainClass
{
private static readonly CancellationTokenSource cts = new CancellationTokenSource();
private static readonly List<HttpListenerWebSocketContext> _sockets = new();
static async Task Main(string[] args)
{
Console.WriteLine("Receiver v1.0.0");
ExternalVariables Ev = new ExternalVariables();
Ev.Get();
UdpReceiver r = new UdpReceiver(_sockets, cts.Token);
r.Port = Ev.ReceivingPort;
var Server = new HttpListener();
Server.Prefixes.Add("http://+:" + Ev.WebsocketPort);
Server.Start();
Task.Run(r.ReceiveMessages);
// need a way of shutting everything down
Console.CancelKeyPress += (sender, eventArgs) =>
{
cts.Cancel();
eventArgs.Cancel = true;
};
try
{
while (true)
{
var context = await Server.GetContextAsync(cts.Token);
Console.WriteLine("Websocket Connected...");
// hand off the client to another task so we can continue listening
Task.Run(() => WebsocketClientHandler(context));
}
}
catch (OperationCanceledException) {}
}
public static async Task WebsocketClientHandler(HttpListenerContext context)
{
try
{
var socket = await context.AcceptWebSocketAsync(null);
lock (_sockets)
{
_sockets.Add(socket);
}
}
catch
{
context.Response.Dispose();
}
}
}
public class UdpReceiver
{
public int Port { get; set; }
private List<HttpListenerWebSocketContext> _contexts;
private CancellationToken _token;
public UdpReceiver(List<HttpListenerWebSocketContext> contexts, CancellationToken token)
{
Port = 0;
_contexts = contexts;
_token = token;
}
public async Task ReceiveMessages()
{
IPEndPoint e = new IPEndPoint(IPAddress.Any, Port);
using var u = new UdpClient(e);
try
{
while (true)
{
var response = await u.ReceiveAsync(_token);
ProcessData(response.Buffer);
}
}
catch (OperationCanceledException) {}
}
public void ProcessData(byte[] Data)
{
ReceivedData ReceivedData = new ReceivedData();
ReceivedData.Data = Data;
ReceivedData.Process();
// broadcast to all sockets
List<HttpListenerWebSocketContext> list;
lock (_contexts)
{
list = _contexts.ToList();
}
foreach (var context in list)
{
await SendToSocket(context, Data)
}
}
public static async Task SendToSocket(HttpListenerContext context, byte[] bytes)
{
try
{
await context.WebSocket.SendAsync(bytes, WebSocketMessageType.Binary, false, _token);
}
catch (OperationCanceledException) {}
catch (Exception e)
{
connected = false;
Console.WriteLine(e);
Console.WriteLine(e.StackTrace);
lock(_sockets)
{
_contexts.Remove(context); // remove failed context
context.WebSocket.Dispose();
}
}
}
}