目前我有一个程序创建了 2 个 TCP 客户端,它们连接到同一服务器上的不同端口(这不是选择,不能更改)。但是,我似乎在客户端读取传入消息时遇到问题。我在听众那边接收/发送数据就好了。
这是实例化每个客户端(这是通过 DPI)并从中读取数据的代码:
public class TcpClientHolder : ITcpClientHolder
{
TcpClient _tcpClient;
NetworkStream _networkStream;
public void Open(string ipAddress, int port)
{
_tcpClient = new TcpClient();
_tcpClient.Connect(_ipAddress, _port);
_networkStream = _tcpClient.GetStream();
_tcpClient.ReceiveBufferSize = _receivebuffersize;
_tcpClient.SendBufferSize = _sendbuffersize;
_tcpClient.NoDelay = true;
_tcpClient.LingerState = new LingerOption(true, 0);
_tcpClient.LingerState.LingerTime = 0;
}
string ReadLine()
{
string _line = "";
if (_networkStream.DataAvailable)
{
int _size = _networkStream.Read(receivebuffer, 0, _tcpClient.Available);
_line = Encoding.UTF8.GetString(receivebuffer, 0, _size);
}
else
{
// send empty packet every 2 seconds to detect socket error faster
if ((DateTime.Now - _lastKeepaliveSynchTime).TotalMilliseconds > _tcpKeepalivePeriod)
{
_logMan.WriteAll($"Sending Keepalive message or Empty Packet to server ");
byte[] _data = Encoding.UTF8.GetBytes("\r\n");
_networkStream.Write(_data, 0, _data.Length);
_networkStream.Flush();
_lastKeepaliveSynchTime = DateTime.Now;
}
}
}
}
我想知道的是双重的。首先,我的设置是否会导致在客户端读取数据时出现问题。如果不是,代码中是否有错误导致阅读问题?
附加信息:我正在从同一个程序创建两个不同的客户端。他们连接到同一个服务器/程序,只是使用不同的端口。我在同一个程序/服务器上编写了一个带有两个侦听器的模拟器,它可以很好地处理这两个客户端连接。我可以单独从每个客户端接收数据并单独发送数据。但是,当我想读取数据时,问题出在客户端,但我没有获得任何可用数据。没有错误,只是没有可读取的数据。
不确定实现有什么问题,但是,请参阅以下我所做的将读/写逻辑分离到新线程中的操作。
public class TcpClientBase : ITcpClientBase
{
ILogMan logMan;
ConcurrentQueue<string> receivedQueue;
ConcurrentQueue<string> sendQueue;
TcpClient client;
public TcpClientBase(string address, int port, ILogMan logMan)
{
this.logMan = logMan;
this.receivedQueue = new ConcurrentQueue<string>();
this.sendQueue = new ConcurrentQueue<string>();
Thread thread = new Thread(() => HandleConnection(address, port));
thread.Start();
}
/// <summary>
/// Handle any conenction to the device
/// </summary>
/// <param name="ip">Ip address</param>
/// <param name="port">Port</param>
void HandleConnection(string ip, int port)
{
while (true)
{
if (CreateClient(ip, port, out client))
{
try
{
while (client != null && SocketConnected(client.Client))
{
if (client.Client.Available > 0)
{
byte[] buffer = new byte[client.Client.Available];
client.GetStream().Read(buffer, 0, buffer.Length);
string message = Encoding.ASCII.GetString(buffer);
receivedQueue.Enqueue(message);
}
if (sendQueue.Count > 0 && sendQueue.TryDequeue(out string? toSend))
{
client.Client.Send(Encoding.ASCII.GetBytes(toSend));
}
}
}
catch
{
logMan.WriteError("Tcp Client encountered and error and disconnected...");
}
}
// Attempt reconnect in 1 seconds
Thread.Sleep(1000);
}
}
/// <summary>
/// Check if socket is connected
/// </summary>
bool SocketConnected(Socket s)
{
bool part1 = s.Poll(1000, SelectMode.SelectRead);
bool part2 = (s.Available == 0);
if (part1 && part2)
{
IsConnected = false;
return false;
}
else
{
IsConnected = true;
return true;
}
}
/// <summary>
/// Initialize the client and wrap it
/// </summary>
bool CreateClient(string ip, int port, out TcpClient client)
{
try
{
client = new TcpClient();
client.Connect(ip, port);
IsConnected = true;
}
catch
{
logMan.WriteError($"Error reconnecting client for server [{ip}] on port [{port}]");
client = null;
IsConnected = false;
return false;
}
return true;
}
/// <summary>
/// Check if there is connection
/// </summary>
public bool IsConnected { get; set; }
/// <summary>
/// Read the next line in the queue
/// </summary>
/// <returns>returns the next line in the queue</returns>
public string ReadLine()
{
if (receivedQueue.Count > 0 && receivedQueue.TryDequeue(out string result))
{
return result;
}
return string.Empty;
}
/// <summary>
/// Write a message to the connection
/// </summary>
/// <param name="message">message</param>
public void Write(string message) =>
sendQueue.Enqueue(message);
}
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Windows.Forms;
namespace WindowsFormsApp1
{
public class TcpClientWorker : IDisposable
{
protected static bool _endLoop = false;
protected static Mutex _mutexEndLoop = new Mutex();
protected static AutoResetEvent _autoEvent = new AutoResetEvent(false);
protected static Thread _objThread = null;
private TcpClient _tcpClient;
private NetworkStream _networkStream;
public TcpClientWorker()
{
}
~TcpClientWorker()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public bool EndLoop
{
get
{
bool result = false;
_mutexEndLoop.WaitOne();
result = _endLoop;
_mutexEndLoop.ReleaseMutex();
return result;
}
set
{
_mutexEndLoop.WaitOne();
_endLoop = value;
_mutexEndLoop.ReleaseMutex();
}
}
public void Run()
{
EndLoop = false;
_objThread = Thread.CurrentThread;
_autoEvent.Set();
while (EndLoop == false)
{
try
{
Thread.Sleep(100);
}
catch (Exception)
{
break;
}
}
}
public void KillThread()
{
try
{
if (_objThread != null)
{
if (_objThread.IsAlive == false)
{
return;
}
}
EndLoop = true;
if (_objThread != null)
{
_objThread.Join();
}
}
catch (Exception)
{
}
finally
{
_objThread = null;
}
}
public void WaitForThreadToStart()
{
_autoEvent.WaitOne();
}
protected virtual void Dispose(bool bFreeAll)
{
KillThread();
if (bFreeAll)
{
_mutexEndLoop.Close();
}
else
{
_mutexEndLoop.Close();
}
}
public void Connect(string ipAddress, int port)
{
try
{
if (_tcpClient == null)
{
_tcpClient = new TcpClient();
}
_tcpClient.Connect(ipAddress, port);
_tcpClient.ReceiveBufferSize = int.MaxValue;
_tcpClient.SendBufferSize = int.MaxValue;
_tcpClient.NoDelay = true;
_tcpClient.LingerState = new LingerOption(true, 0);
_tcpClient.LingerState.LingerTime = 0;
_networkStream = _tcpClient.GetStream();
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
public void Disconnect()
{
try
{
if (_networkStream == null)
{
return;
}
if (_networkStream.DataAvailable)
{
_networkStream.Flush();
}
_networkStream.Close();
if (_tcpClient == null)
{
return;
}
if (_tcpClient.Connected)
{
_tcpClient.Close();
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
public bool IsConnected()
{
try
{
if (_tcpClient == null)
{
return false;
}
return _tcpClient.Connected;
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
return false;
}
public void Write(String message)
{
try
{
if (_tcpClient == null)
{
return;
}
if (_networkStream == null)
{
return;
}
if (String.IsNullOrEmpty(message))
{
return;
}
if (_tcpClient.Connected)
{
byte[] bMessage = Encoding.ASCII.GetBytes(message);
_networkStream.Write(bMessage, 0, bMessage.Length);
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
public String Read()
{
try
{
if (_tcpClient == null)
{
return "";
}
if (_tcpClient.Client == null)
{
return "";
}
if (_networkStream == null)
{
return "";
}
if (_tcpClient.Connected)
{
if (_tcpClient.Client.Available > 0) {
byte[] bMessage = new byte[_tcpClient.Client.Available];
Int32 length = _networkStream.Read(bMessage, 0, bMessage.Length);
return Encoding.ASCII.GetString(bMessage, 0, length);
}
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
return "";
}
}
}
调用上面的代码如下
using System;
using System.Diagnostics;
using System.Windows.Forms;
namespace WindowsFormsApp1
{
public partial class Form1 : Form
{
private TcpClientWorker _tcpClientWorker1;
private TcpClientWorker _tcpClientWorker2;
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
// connect to server via port 1001
_tcpClientWorker1 = new TcpClientWorker();
_tcpClientWorker1.Run();
_tcpClientWorker1.WaitForThreadToStart();
_tcpClientWorker1.Connect("127.0.0.1", 1001);
// send message to server via port 1001
_tcpClientWorker1.Write("this message sent to 1001");
// receive message to server via port 1001
Debug.Print(_tcpClientWorker1.Read());
// disconnect from server
_tcpClientWorker1.Disconnect();
_tcpClientWorker1.KillThread();
// connect to server via port 1002
_tcpClientWorker2 = new TcpClientWorker();
_tcpClientWorker2.Run();
_tcpClientWorker2.WaitForThreadToStart();
_tcpClientWorker2.Connect("127.0.0.1", 1002);
// send message to server via port 1002
_tcpClientWorker2.Write("this message sent to 1002");
// receive message to server via port 1002
Debug.Print(_tcpClientWorker2.Read());
// disconnect from server
_tcpClientWorker2.Disconnect();
_tcpClientWorker2.KillThread();
}
}
}