我有一个 wxWidget (3.2.1) 应用程序,它在 MacOS (12.6.3) 上运行良好,但在 Centos 8.2 上随机挂起。挂起 always 在应用程序打印以下消息后发生
尝试删除时未找到源 ID xxxx
我已经尝试通过 gdb 运行该应用程序,但我无法找到上述消息的来源。
应用程序有一个框架类来处理所有与 GUI 相关的东西,以及一个单独的线程来处理所有与网络相关的东西。框架类有一个网络连接监听器。当客户端发送数据时,框架类会收到通知并将套接字放入队列中,网络线程会读取该队列,然后网络线程会处理传入的数据。同样,当框架类要写入客户端时,它会将数据放入写入队列,网络类将数据写入客户端。
每当网络线程完成接收数据包时,它会使用以下代码通知 GUI 线程
wxThreadEvent event(wxEVT_THREAD, ID_ProcClientCmd);
event.SetPayload(pkt_info);
wxQueueEvent(parent, event.Clone());
在 GUI 线程中
// in .h file
wxMutex* mutexReadQ;
wxMutex* mutexWriteQ;
// in .CPP file
mutexReadQ = new wxMutex;
mutexWriteQ = new wxMutex;
// setup Network thread
networkThread = new NetworkThread(this, mutexKeepRunning,mutexReadQ, mutexWriteQ, &keepRunning);
networkThread->Run();
// When GUI thread is notified that a socket has incoming data
mutexReadQ->Lock();
networkThread->la_readQ.push(skt);
mutexReadQ->Unlock();
// when GUI thread wants to write to skt
mutexWriteQ->Lock();
networkThread->la_writeQ.push(pi);
mutexWriteQ->Unlock();
在网络线程中
// in .h file
wxMutex* mutexReadQ;
wxMutex* mutexWriteQ;
// in .cpp file
// constructor
NetworkThread::NetworkThread(wxEvtHandler* p_parent, wxMutex* p_mutexKeepRunning, wxMutex* p_mutexReadQ, wxMutex* p_mutexWriteQ,
bool* p_keepRunning)
{
this->parent = p_parent;
this->mutexKeepRunning = p_mutexKeepRunning;
this->keepRunning = p_keepRunning;
this->mutexReadQ = p_mutexReadQ;
this->mutexWriteQ = p_mutexWriteQ;
}
void* NetworkThread::Entry()
{
wxSocketBase* readSkt;
PKT_INFO* pkt_info;
while (1)
{
// check if app wants this thread to terminate
mutexKeepRunning->Lock();
if (*keepRunning == false) {
mutexKeepRunning->Unlock();
return NULL;
}
mutexKeepRunning->Unlock();
readSkt = NULL;
pkt_info = NULL;
// process read requests
mutexReadQ->Lock();
if (!la_readQ.empty()) {
readSkt = la_readQ.front();
la_readQ.pop();
}
mutexReadQ->Unlock();
if (readSkt)
readFromSkt(readSkt);
// process write requests
mutexWriteQ->Lock();
if (!la_writeQ.empty()) {
pkt_info = la_writeQ.front();
la_writeQ.pop();
}
mutexWriteQ->Unlock();
if (pkt_info)
writeToSkt(pkt_info);
// nothing to do if both Qs are empty
if ((readSkt == NULL) && (pkt_info == NULL)) {
wxMilliSleep(THREAD_SLEEP_DUR);
continue;
}
} // while (1)
}
我有一个最小的应用程序,一旦运行就会重现上述问题。这个应用程序有两个部分。一个是 wx_widget 代码,另一个是使用套接字的 echo_client。这是 wxWidget 应用程序的代码
#include <wx/wx.h>
#include <wx/socket.h>
#include <queue>
using namespace std;
typedef unsigned char uchar;
#define MIN(_x, _y) (((_x) < (_y)) ? (_x) : (_y))
#define NET_PKT_SIZE (1024 * 1024)
#define NETWORK_PORT 54321
enum AppIDs
{
ID_SktServer = wxID_LAST + 1,
ID_SktClient,
ID_ProcClientCmd
};
typedef struct
{
wxSocketBase* skt;
uchar* buf;
int length;
} PKT_INFO;
class NetworkThread : public wxThread
{
public:
NetworkThread(wxEvtHandler* p_parent, wxMutex* p_mutexKeepRunning, wxMutex* p_mutexReadQ, wxMutex* p_mutexWriteQ,
bool* p_keepRunning);
virtual void* Entry();
queue<wxSocketBase*> la_readQ; // locked access
queue<PKT_INFO*> la_writeQ; // locked access
wxEvtHandler* parent;
wxMutex* mutexKeepRunning;
wxMutex* mutexReadQ;
wxMutex* mutexWriteQ;
bool* keepRunning;
private:
void readFromSkt(wxSocketBase* skt);
void writeToSkt(PKT_INFO* pkt_info);
int pktSendCtr;
};
NetworkThread::NetworkThread(wxEvtHandler* p_parent, wxMutex* p_mutexKeepRunning,
wxMutex* p_mutexReadQ, wxMutex* p_mutexWriteQ,
bool* p_keepRunning)
{
this->parent = p_parent;
this->mutexKeepRunning = p_mutexKeepRunning;
this->keepRunning = p_keepRunning;
this->mutexReadQ = p_mutexReadQ;
this->mutexWriteQ = p_mutexWriteQ;
pktSendCtr = 1;
}
void* NetworkThread::Entry()
{
wxSocketBase* readSkt;
PKT_INFO* pkt_info;
while (1)
{
// check if app wants this thread to terminate
mutexKeepRunning->Lock();
if (*keepRunning == false) {
mutexKeepRunning->Unlock();
return NULL;
}
mutexKeepRunning->Unlock();
readSkt = NULL;
pkt_info = NULL;
// process read requests
mutexReadQ->Lock();
if (!la_readQ.empty()) {
readSkt = la_readQ.front();
la_readQ.pop();
}
mutexReadQ->Unlock();
if (readSkt)
readFromSkt(readSkt);
// process write requests
mutexWriteQ->Lock();
if (!la_writeQ.empty()) {
pkt_info = la_writeQ.front();
la_writeQ.pop();
}
mutexWriteQ->Unlock();
if (pkt_info)
writeToSkt(pkt_info);
// nothing to do if both Qs are empty
if ((readSkt == NULL) && (pkt_info == NULL)) {
#ifdef USE_YIELD
Yield();
#else
wxMilliSleep(25);
#endif
continue;
}
} // while (1)
}
void NetworkThread::readFromSkt(wxSocketBase* skt)
{
PKT_INFO* pkt_info;
unsigned char* buf;
int bytesToRead;
int bytesRead;
int index;
if (!skt)
return;
// allocate buffer
buf = new unsigned char[NET_PKT_SIZE];
if (!buf) {
wxPrintf("readFromSkt: system out of memory\n");
skt->Close();
return;
}
// ensure we really have some data to read
skt->Peek(buf, 1);
if (skt->LastCount() == 0) {
wxPrintf("readFromSkt: nothign to read\n");
return;
}
bytesToRead = NET_PKT_SIZE;
index = 0;
while (1)
{
skt->Read(&buf[index], bytesToRead);
if (skt->Error()) {
skt->Close();
delete buf;
wxPrintf("networkThread:readFromSkt: read error, closing conxn\n");
return;
}
bytesRead = skt->LastReadCount();
bytesToRead -= bytesRead;
if (bytesToRead <= 0)
break;
index += bytesRead;
}
// setup pkt info so we can dispatch it to GUI thread
pkt_info = new PKT_INFO;
if (!pkt_info) {
wxPrintf("readFromSkt: system out of memory\n");
skt->Close();
delete buf;
return;
}
pkt_info->skt = skt;
pkt_info->buf = buf;
pkt_info->length = NET_PKT_SIZE;
// let main thread process payload
if (parent != NULL)
{
wxThreadEvent event(wxEVT_THREAD, ID_ProcClientCmd);
event.SetPayload(pkt_info);
wxQueueEvent(parent, event.Clone());
}
}
void NetworkThread::writeToSkt(PKT_INFO* pkt_info)
{
wxSocketBase* skt;
uchar* buf;
int bytesToWrite;
int index;
int writeCount;
if (!pkt_info || !pkt_info->skt || !pkt_info->buf || (pkt_info->length <= 0))
return;
skt = pkt_info->skt;
buf = pkt_info->buf;
bytesToWrite = pkt_info->length;
index = 0;
while (1)
{
// write only 1400 bytes at a time so we can cross router boundaries
skt->Write(&buf[index], MIN(bytesToWrite, 1400));
if (skt->Error())
{
int se = skt->LastError();
if (se == wxSOCKET_WOULDBLOCK) {
continue;
}
skt->Close();
wxPuts("NetworkThread::writeToSkt: write error, closing conxn");
break;
}
writeCount = skt->LastWriteCount();
bytesToWrite -= writeCount;
if (bytesToWrite <= 0)
break;
index += writeCount;
}
delete pkt_info->buf;
delete pkt_info;
wxPrintf("Sent pkt %d\n", pktSendCtr++);
}
/////////////////////////////////////////////////
class MyApp : public wxApp
{
public:
virtual bool OnInit();
};
IMPLEMENT_APP(MyApp)
class Button : public wxFrame
{
public:
Button(const wxString& title);
void onProcessMsg(PKT_INFO* data);
private:
wxSocketServer* sktServer;
wxMutex* mutexKeepRunning;
wxMutex* mutexReadQ;
wxMutex* mutexWriteQ;
NetworkThread* networkThread;
wxButton* btnWidget;
bool keepRunning;
int pktRecvdCtr;
void OnBtnClicked(wxCommandEvent & event);
void OnServerSktEvent(wxSocketEvent &event);
void OnClientSktEvent(wxSocketEvent &event);
void OnProcessClientCmd(wxThreadEvent &event);
void SendDataToNetworkThread(wxSocketBase* skt);
};
Button::Button(const wxString& title)
: wxFrame(NULL, wxID_ANY, title, wxDefaultPosition, wxSize(270, 150))
{
wxPanel *panel = new wxPanel(this, wxID_ANY);
btnWidget = new wxButton(panel, wxID_EXIT, wxT("Start server"),
wxPoint(20, 20));
Connect(wxID_EXIT, wxEVT_COMMAND_BUTTON_CLICKED,
wxCommandEventHandler(Button::OnBtnClicked));
btnWidget->SetFocus();
Centre();
keepRunning = true;
pktRecvdCtr = 1;
}
void Button::OnBtnClicked(wxCommandEvent & WXUNUSED(event))
{
btnWidget->Enable(false);
wxIPV4address addr;
addr.Service(NETWORK_PORT);
sktServer = new wxSocketServer(addr, wxSOCKET_REUSEADDR | wxSOCKET_NOWAIT);
if (!sktServer->IsOk()) {
wxPrintf("Could not listen on port %d\n", NETWORK_PORT);
return ;
}
// setup event handler and subscribe to connection events
sktServer->SetEventHandler(*this, ID_SktServer);
sktServer->SetNotify(wxSOCKET_CONNECTION_FLAG);
sktServer->Notify(true);
// start thread to handle network traffic
mutexKeepRunning = new wxMutex;
mutexReadQ = new wxMutex;
mutexWriteQ = new wxMutex;
keepRunning = true;
Bind(wxEVT_SOCKET, &Button::OnServerSktEvent, this, ID_SktServer);
Bind(wxEVT_SOCKET, &Button::OnClientSktEvent, this, ID_SktClient);
Bind(wxEVT_THREAD, &Button::OnProcessClientCmd, this, ID_ProcClientCmd);
networkThread = new NetworkThread(this->GetEventHandler(),
mutexKeepRunning, mutexReadQ, mutexWriteQ,
&keepRunning);
networkThread->Run();
}
void Button::OnServerSktEvent(wxSocketEvent &event)
{
if (event.GetSocketEvent() != wxSOCKET_CONNECTION) {
wxPrintf("Got unknown socket event from sktServer\n");
return;
}
// use non blocking accept to grab new connection
wxSocketBase *skt = sktServer->Accept(false);
wxPrintf("Client connected\n");
skt->SetEventHandler(*this, ID_SktClient);
skt->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
skt->Notify(true);
}
void Button::OnClientSktEvent(wxSocketEvent &event)
{
wxSocketBase *skt = event.GetSocket();
skt->SetFlags(wxSOCKET_NOWAIT); // make it non blocking
switch (event.GetSocketEvent())
{
case wxSOCKET_INPUT:
mutexReadQ->Lock();
networkThread->la_readQ.push(skt);
mutexReadQ->Unlock();
break;
case wxSOCKET_OUTPUT:
wxPrintf("OnClientSktEvent: got output data\n");
break;
case wxSOCKET_LOST:
wxPrintf("OnClientSktEvent: client disconnected\n");
break;
default:
wxPrintf("OnClientSktEvent: Unknown event: %d\n", event.GetSocketEvent());
}
}
void Button::OnProcessClientCmd(wxThreadEvent &event)
{
PKT_INFO* pi = event.GetPayload<PKT_INFO *>();
wxSocketBase* skt;
if (!pi) {
return;
}
skt = pi->skt;
wxPrintf("Recv pkt %d\n", pktRecvdCtr++);
// delete pkt_info
delete pi->buf;
delete pi;
SendDataToNetworkThread(skt);
}
void Button::SendDataToNetworkThread(wxSocketBase* skt)
{
PKT_INFO* pi = new PKT_INFO;
if (!pi) {
wxPrintf("System out of memory\n");
return;
}
pi->buf = new unsigned char[NET_PKT_SIZE];
if (!pi->buf) {
delete pi;
wxPrintf("System out of memory\n");
return;
}
pi->length = NET_PKT_SIZE;
pi->skt = skt;
memset(pi->buf, 0x55, NET_PKT_SIZE);
mutexWriteQ->Lock();
networkThread->la_writeQ.push(pi);
mutexWriteQ->Unlock();
}
bool MyApp::OnInit()
{
Button *btnapp = new Button(wxT("Button"));
btnapp->Show(true);
return true;
}
这是可以像这样编译的 echo_client 代码:gcc echo_client.c -o echo_client
您必须首先运行 wxWidget 应用程序,单击“启动服务器”按钮,然后使用运行 wxWidget 应用程序的系统的 IP 地址运行 echo 客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define NET_PKT_SIZE (1024 * 1024)
#define NETWORK_PORT 54321
char sendBuf[NET_PKT_SIZE];
char recvBuf[NET_PKT_SIZE];
int main(int argc, char** argv)
{
struct sockaddr_in server_addr;
unsigned char pattern = 0;
int index = 0;
int count;
int bytesSent;
int bytesRecvd;
int recvCounter = 0;
int sendCounter = 0;
int sockfd;
if (argc < 2) {
printf("usage: echo_server <IP address>\n");
exit(1);
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(NETWORK_PORT);
server_addr.sin_addr.s_addr = inet_addr(argv[1]);
if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
printf("error creating socket\n");
return 1;
}
if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
printf("couldn't connect to server\n");
return 1;
}
printf("connected to the server\n");
while (1)
{
// send a pkt
memset(sendBuf, pattern++, NET_PKT_SIZE);
count = NET_PKT_SIZE;
index = 0;
while (1)
{
bytesSent = send(sockfd, &sendBuf[index], count, 0);
if (bytesSent < 0) {
perror("error writing to server");
return 1;
}
if (bytesSent == 0) {
perror("write error");
return 1;
}
index += bytesSent;
count -= bytesSent;
if (count <= 0)
break;
}
printf("wrote %d packets\n", ++sendCounter);
// recv pkt
count = NET_PKT_SIZE;
index = 0;
while (1)
{
bytesRecvd = recv(sockfd, &recvBuf[index], count, 0);
if (bytesRecvd < 0) {
perror("error reading from server");
return 1;
}
index += bytesRecvd;
count -= bytesRecvd;
if (count <= 0)
break;
}
printf("recvd %d packets\n", ++recvCounter);
}
close(sockfd);
return 0;
}