两对TCP服务器/客户端共享相同的代码,但是只有一个客户端接收消息

问题描述 投票:1回答:1

我有两对简单的TCP服务器/客户端,即每个服务器在Windows上运行一个客户端:

  • 服务器在进程(应用程序)中运行。
  • 客户端在其他过程中运行。
  • 服务器不断向其配对的客户端发送心跳(字符串)。
  • 第一对服务器/客户端在各自的线程中运行其主循环。
  • 一旦第一个服务器/客户端与第一个心跳握手,第二对服务器/客户端将在各自的线程中启动其主循环。
  • 对于该测试,它们在具有不同端口的同一台计算机上运行:2345和2346。

现在是我的问题

  • 第一个客户端收到其服务器的心跳。
  • 第二个client进行了[[NOT,尽管第二个server发出了心跳而没有错误。
这是服务器代码:

// hello_dualchannel_server.cpp : This file contains the 'main' function. #include "pch.h" #include <iostream> #include <thread> #include <winsock2.h> #include <ws2tcpip.h> #include <spdlog/spdlog.h> #define SPDLOG_WCHAR_TO_UTF8_SUPPORT #ifdef _DEBUG #if !defined(SPDLOG_ACTIVE_LEVEL) #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE #endif // #if !defined(SPDLOG_ACTIVE_LEVEL) #define SPDLOG_DEBUG_ON #define SPDLOG_TRACE_ON #define _trace SPDLOG_TRACE #endif // #ifdef _DEBUG using namespace spdlog; SOCKET g_sockFirst = 0; SOCKET g_sockClientFirst = 0; std::thread g_threadFirst; uint32_t g_timeLatestHeartBeatFirst = 0; SOCKET g_sockSecond = 0; SOCKET g_sockClientSecond = 0; std::thread g_threadSecond; uint32_t g_timeLatestHeartBeatSecond = 0; void SetupLogger() { #ifdef _DEBUG spdlog::set_level(spdlog::level::trace); spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v"); #else spdlog::set_level(spdlog::level::info); spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v"); #endif // #ifdef _DEBUG } int InitWinSock() { WORD wVersionRequested; WSADATA wsaData; /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */ wVersionRequested = MAKEWORD(2, 2); int err = WSAStartup(wVersionRequested, &wsaData); if (err != 0) { /* Tell the user that we could not find a usable */ /* Winsock DLL. */ printf("WSAStartup failed with error: %d\n", err); return 1; } /* Confirm that the WinSock DLL supports 2.2.*/ /* Note that if the DLL supports versions greater */ /* than 2.2 in addition to 2.2, it will still return */ /* 2.2 in wVersion since that is the version we */ /* requested. */ if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { /* Tell the user that we could not find a usable */ /* WinSock DLL. */ printf("Could not find a usable version of Winsock.dll\n"); WSACleanup(); return 1; } else printf("The Winsock 2.2 dll was found okay\n"); return 0; } bool Init(int host_port, SOCKET* p_sockServer, SOCKET*p_sockClient) { int err = 0; int* p_int = 0; std::string host_name("127.0.0.1"); struct sockaddr_in my_addr; int addr_size = 0; sockaddr_in sadr_client; if (!*p_sockServer) { *p_sockServer = socket(AF_INET, SOCK_STREAM, 0); if (*p_sockServer == -1) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Server Error initializing socket: {}", log); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if ((setsockopt(*p_sockServer, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1) || (setsockopt(*p_sockServer, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Server Error setting options: {}", log); free(p_int); goto FINISH; } free(p_int); info("Server socket is set up."); my_addr.sin_family = AF_INET; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = INADDR_ANY; if (bind(*p_sockServer, (sockaddr*)&my_addr, sizeof(my_addr)) == -1) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Server Error binding to socket, make sure nothing else is listening on this port: {}", log); goto FINISH; } if (listen(*p_sockServer, 10) == -1) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Server Error listening: {}", log); goto FINISH; } info("SUCCESS: Server socket listening ..."); } info("Server accepting connection ..."); addr_size = sizeof(sockaddr_in); char sAddress[MAX_PATH]; *p_sockClient = accept(*p_sockServer, (sockaddr*)&sadr_client, &addr_size); if (*p_sockClient == INVALID_SOCKET) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Server error accepting client connection: {}", log); // DO NOT close sockets here. return false; } inet_ntop(sadr_client.sin_family, &sadr_client.sin_addr, sAddress, MAX_PATH); g_timeLatestHeartBeatFirst = GetCurrentTime(); info("SUCCESS: Server accepted client connection."); return true; FINISH: closesocket(*p_sockServer); return false; } bool IsConnected(uint32_t timeLatestHeartBeat) { // CAUTION: denser than client for sure catch const unsigned long ConnTimeoutMs = 300; auto cur = GetCurrentTime(); auto latest = timeLatestHeartBeat; return cur - latest < ConnTimeoutMs; } bool StayInTouch(const char* name, SOCKET* pSockClient, uint32_t* pTimeLatestHeartBeat) { if (IsConnected(*pTimeLatestHeartBeat)) return true; char heartBeat[] = "biku"; int nBytesSent = 0; int flags = 0; int res = send(*pSockClient, heartBeat, sizeof(heartBeat), flags); if (res == SOCKET_ERROR) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("{}: Server failed to send heartbeat: {}, Windows error: {}", name, log, GetLastError()); return false; } else if (res == 0) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("{}: Server sent zerobyte heartbeat: {}", name, log); return false; } debug("{}: Heartbeat sent: {}", name, heartBeat); *pTimeLatestHeartBeat = GetCurrentTime(); return true; } void Close(SOCKET* pSock) { closesocket(*pSock); *pSock = 0; } bool Connect() { if (g_threadFirst.joinable()) { warn("FirstTunnel already running. Skipped."); return true; } g_threadFirst = std::thread([&]() { bool isConnected = false; while (true) { while (!isConnected) { isConnected = Init(2345, &g_sockFirst, &g_sockClientFirst); } isConnected = StayInTouch("FirstTunnel", &g_sockClientFirst, &g_timeLatestHeartBeatFirst); if (!isConnected) { // We don't close as client. // We keep connecting error("About to reconnect ..."); Sleep(1000); continue; } if (!g_threadSecond.joinable()) { g_threadSecond = std::thread([&]() { while (true) { while (!isConnected) { isConnected = Init(2346, &g_sockSecond, &g_sockClientSecond); } isConnected = StayInTouch("SecondTunnel", &g_sockClientSecond, &g_timeLatestHeartBeatSecond); if (!isConnected) { // We don't close as client. // We keep connecting error("About to reconnect ..."); Sleep(1000); continue; } } info("SecondTunnel quitting..."); Close(&g_sockSecond); }); } } info("FirstTunnel quitting..."); Close(&g_sockFirst); }); while (true) { //info("main thread ..."); Sleep(3000); } return g_threadFirst.joinable() ? true : false; } int main() { SetupLogger(); info("Hello World!\n"); if (InitWinSock()) { critical("Failed to initialize Window socket. Aborted."); } Connect(); if (g_threadSecond.joinable()) { g_threadSecond.join(); } if (g_threadFirst.joinable()) { g_threadFirst.join(); } WSACleanup(); info("Bye!"); }

这里是客户代码

// hello_dualchannel_client.cpp : This file contains the 'main' function. // #include "pch.h" #include <iostream> #include <thread> #include <winsock2.h> #include <ws2tcpip.h> #include <spdlog/spdlog.h> #define SPDLOG_WCHAR_TO_UTF8_SUPPORT #ifdef _DEBUG #if !defined(SPDLOG_ACTIVE_LEVEL) #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE #endif // #if !defined(SPDLOG_ACTIVE_LEVEL) #define SPDLOG_DEBUG_ON #define SPDLOG_TRACE_ON #define _trace SPDLOG_TRACE #endif // #ifdef _DEBUG using namespace spdlog; SOCKET g_sockFirst = 0; std::thread g_threadFirst; uint32_t g_timeLatestHeartBeatFirst; SOCKET g_sockSecond = 0; std::thread g_threadSecond; uint32_t g_timeLatestHeartBeatSecond; void SetupLogger() { #ifdef _DEBUG spdlog::set_level(spdlog::level::trace); spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v"); #else spdlog::set_level(spdlog::level::info); spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v"); #endif // #ifdef _DEBUG } int InitWinSock() { WORD wVersionRequested; WSADATA wsaData; /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */ wVersionRequested = MAKEWORD(2, 2); int err = WSAStartup(wVersionRequested, &wsaData); if (err != 0) { /* Tell the user that we could not find a usable */ /* Winsock DLL. */ printf("WSAStartup failed with error: %d\n", err); return 1; } /* Confirm that the WinSock DLL supports 2.2.*/ /* Note that if the DLL supports versions greater */ /* than 2.2 in addition to 2.2, it will still return */ /* 2.2 in wVersion since that is the version we */ /* requested. */ if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { /* Tell the user that we could not find a usable */ /* WinSock DLL. */ printf("Could not find a usable version of Winsock.dll\n"); WSACleanup(); return 1; } else printf("The Winsock 2.2 dll was found okay\n"); return 0; } bool Init(int host_port, SOCKET* p_sock) { int err = 0; int* p_int = 0; std::string host_name("127.0.0.1"); struct sockaddr_in my_addr; char handshake[] = "hello"; //int nBytesSent; if (!*p_sock) { *p_sock = socket(AF_INET, SOCK_STREAM, 0); if (*p_sock == -1) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Client Error initializing socket {}", log); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if ((setsockopt(*p_sock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1) || (setsockopt(*p_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Client Error setting options {}", log); free(p_int); goto FINISH; } free(p_int); info("SUCCESS: Client socket is set up."); } my_addr.sin_family = AF_INET; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); inet_pton(my_addr.sin_family, host_name.c_str(), &my_addr.sin_addr); if (connect(*p_sock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == SOCKET_ERROR) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Client Error connecting socket {}", log); Sleep(1000); goto FINISH; } /*nBytesSent = send(g_sockFirst, handshake, sizeof(handshake), 0); if (nBytesSent <= 0) { char log[MAX_PATH]; strerror_s(log, MAX_PATH, errno); error("Client error sending handshake: {}", log); goto FINISH; }*/ g_timeLatestHeartBeatFirst = GetCurrentTime(); info("SUCCESS: Client connected to server."); return true; FINISH: closesocket(*p_sock); *p_sock = 0; return false; } bool IsConnected(uint32_t timeLatestHeartBeat) { const unsigned long ConnTimeoutMs = 3000; auto cur = GetCurrentTime(); auto latest = timeLatestHeartBeat; //if (cur - latest > ConnTimeoutMs) //{ // debug("cur: {}, late: {}", cur, latest); //} return cur - latest < ConnTimeoutMs; } bool StayInTouch(const char* name, SOCKET* pSock, uint32_t* pTimeLatestHeartBeat) { // Client checks inbox right away and measure timeout later. char heartBeat[MAX_PATH] = { 0 }; // CAUTION: min 100ms required for receiving heartbeat. //const uint32_t TimeoutMS = 100; int flags = 0; int nBytesRecved = recv(*pSock, heartBeat, sizeof(heartBeat), flags); bool gotHeartbeat = nBytesRecved > 0; if (gotHeartbeat) { debug("{}: Heartbeat received: {}", name, heartBeat); *pTimeLatestHeartBeat = GetCurrentTime(); } return IsConnected(*pTimeLatestHeartBeat); } void Close(SOCKET* pSock) { closesocket(*pSock); *pSock = 0; } bool Connect() { if (g_threadFirst.joinable()) { warn("FirstTunnel already running. Skipped."); return true; } g_threadFirst = std::thread([&]() { bool isConnected = false; while (true) { while (!isConnected) { isConnected = Init(2345, &g_sockFirst); } isConnected = StayInTouch("FirstTunnel", &g_sockFirst, &g_timeLatestHeartBeatFirst); if (!isConnected) { // We don't close as client. // We keep connecting Close(&g_sockFirst); error("About to reconnect ..."); continue; } if (!g_threadSecond.joinable()) { g_threadSecond = std::thread([&]() { while (true) { while (!isConnected) { isConnected = Init(2346, &g_sockSecond); } isConnected = StayInTouch("SecondTunnel", &g_sockSecond, &g_timeLatestHeartBeatSecond); if (!isConnected) { // We don't close as client. // We keep connecting error("About to reconnect ..."); Sleep(1000); continue; } } info("SecondTunnel quitting..."); Close(&g_sockSecond); }); } } info("FirstTunnel quitting."); Close(&g_sockFirst); }); while (true) { //info("main thread ..."); Sleep(3000); } return g_threadFirst.joinable() ? true : false; } int main() { SetupLogger(); info("Hello World!\n"); if (InitWinSock()) { critical("Failed to initialize Window socket. Aborted."); } Connect(); if (g_threadSecond.joinable()) { g_threadSecond.join(); } if (g_threadFirst.joinable()) { g_threadFirst.join(); } WSACleanup(); info("Bye!"); }

主连接逻辑在功能Connect()中。非常感谢您在哪里出错的提示。

要按原样运行代码,您需要一个依赖项spdlog

vcpkg install spdlog:x64-Windows

您还可以使用自己的代码替换所有基于spdlog的日志记录代码。

更新

观察#1

我进入代码并确认

    所有循环都在运行。这样就产生了所有线程。
  • 由于joinable()保护,没有产生多余的线程。
  • 唯一的失败点是第二个客户端的recv调用。
  • 所以结论

      无防火墙
  • 无冗余线程
  • 根据设计,这两个线程都在客户端和服务器端运行。
  • 观察#2

    运行服务器和客户端程序并让问题发生时,我尝试过

      保持两个程序都打开。
  • 像这样netcat运行netcat(NMap的C:\Apps\Nmap\ncat.exe 127.0.0.1 2346端口)>
  • 这实际上有助于解决问题。它告诉我存在连接问题,而我绝对可以进入客户端代码并看到连接仍然存在。

    观察#3

      我仅在客户端程序的第二个连接代码中保留断点并运行该程序后,我无法进入第一个连接代码。请注意,它们位于单独的线程中。

  • 在两个连接中都保留断点,并在它们之间快速过渡,我可以使步进继续进行而不会陷入一个线程中。
  • 这是我注意到客户端开始接收它应该收到的消息。
  • 因此,我怀疑那里存在线程问题。如果这是问题,那么如何解决呢?

    我有两对简单的TCP服务器/客户端,即每台服务器一个客户端,在Windows上运行:服务器在进程(应用程序)中运行。客户端在另一个过程中运行。服务器不断发送心跳信号(a ...

    c++ multithreading sockets networking
    1个回答
    0
    投票
    我自己发现了问题。就我而言,这是一个愚蠢的错误:
    © www.soinside.com 2019 - 2024. All rights reserved.