我有两个与消息队列通信的进程。每个进程都是其自己的单独文件。发件人似乎已成功将消息发送到消息队列中。不幸的是,接收进程阻塞并且没有观察到通过posix消息队列发送的消息。
侧面注:我成功地使用字符串实现了此目的(使用sscanf解析接收到的消息时,使用sprintf在发送方中构造消息)。因此,我可以确认MQ设置正确。
话虽如此,我跟随this post更改了我的字符串消息队列实现并适应了结构。
这里是结构和一些想要通过消息队列传递的信息。
typedef struct msg_buffer{
long mtype;
struct msg_info{
char* shm_name;
size_t shm_size;
char* path;
}msg_info;
}msg_buffer;
在这两个过程中,消息队列都是打开的。
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MSG; // 10
attr.mq_msgsize = BUFSIZE; // 1200
attr.mq_curmsgs = 0;
// sender and receiver both open mq like below
mq = mq_open(MQ_NAME, O_RDWR | O_CREAT, 0666, &attr);
发送者通过这种方式创建结构,并通过MQ发送它。
ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
// Note: char* path sent looks like "/to/some/path.txt";
// char* shm_name_buffer[20] = "shm_name_00";
char* path_copy = malloc(strlen(path) + 1);
char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
// TODO: fix memory leaks by these char* being copied
strcpy(path_copy, path);
strcpy(shm_name_copy, shm_name_buffer);
struct msg_buffer message = {2, { path_copy, segment_size, shm_name_copy} };
if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0){
fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
}
fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
fprintf(stdout, "path:%s\n", message.msg_info.path);
fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);
mq_getattr(mq, &setAttr);
fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);
return bytes_transferred;
}
这是接收者,它将监听并等待队列中的消息并尝试解析信息:
void *threadCacheProcess(void *thread_id){
int recMessageSize;
msg_buffer message;
while(1){
// recv message sent from proxy and parse
recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_info), NULL);
// block until we get valid message
if(recMessageSize != -1){
fprintf(stdout, "Success!\n");
fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
}
}
}
控制台输出:
发件人控制台:
path:/test/path/to/file.txt
shm_name:/shm_name_0
shm_size:8192
Msgs In Queue: 1
Msg Size: 1219
接收器控制台:
Thread Running... // No "Success or print message" that was expected. Why?
编辑:我已经更新了代码,因此,如果遵循Beej的指南,并带有嵌套示例。不幸的是,接收问题仍然存在。
看看您的消息缓冲区的定义。您定义消息(msg_info)的内容以包含两个pointers,shm_name和path。那些指针指向发送者进程空间中的内存,并且是不是接收者地址空间的一部分。
typedef struct msg_buffer{
long mtype;
struct msg_info{
char* shm_name;
size_t shm_size;
char* path;
}msg_info;
}msg_buffer;
我的建议是阅读有关Sun-RPC和XDR的信息-您需要序列化发送方中的数据,并反序列化接收方中的数据。了解有关数据序列化或编组的信息。
如何解决?确定您的shm_name和路径的最大空间,然后重新定义您的消息以容纳它们。
#define MAX_SHM_NAME (32) //pick appropriate values...
#define MAX_PATH (128)
typedef struct msg_buffer{
long mtype;
struct msg_info{
size_t shm_size;
char shm_name[MAX_SHM_NAME];
char path[MAX_PATH];
}msg_info;
}msg_buffer;
然后复制消息序列化功能中的shm_name和路径值,
ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
// Note: char* path sent looks like "/to/some/path.txt";
// char* shm_name_buffer[20] = "shm_name_00";
/* why malloc? malloc allocs memory in local process address space, the other process cannot see it...
char* path_copy = malloc(strlen(path) + 1);
char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
// TODO: fix memory leaks by these char* being copied
strcpy(path_copy, path);
strcpy(shm_name_copy, shm_name_buffer);
*/
struct msg_buffer message;
message.mtype = 2;
message.msg_info.thing = segment_size;
//you could omit the null-terminators, since you have defined max length
strncpy(message.msg_info.shm_name, shm_name_buffer, MAX_SHM_NAME-1);
message.msg_info.shm_name[MAX_SHM_NAME-1] = '\0';
strncpy(message.msg_info.path, path, MAX_PATH-1);
message.msg_info.path[MAX_PATH-1] = '\0';
//wrong - gotta send whole message...
//if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0)
if(mq_send(mq,(const char*) &message, sizeof(struct msg_buffer), 0) < 0){
fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
}
fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
fprintf(stdout, "path:%s\n", message.msg_info.path);
fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);
mq_getattr(mq, &setAttr);
fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);
return bytes_transferred;
}
您的接收者必须以相同的方式提取数据。最好的方法是将数据提取到本地缓冲区中。
void *threadCacheProcess(void *thread_id){
ssize_t recMessageSize;
msg_buffer message;
unsigned int pri=0; //message priority
while(1){
// recv message sent from proxy and parse
recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_buffer), &pri);
if( recMessageSize < 0 ) {
int bad = errno;
fprintf(stdout, "error: %d:\n", bad); perror(bad);
}
// block until we get valid message
else {
fprintf(stdout, "size %ld, priority %d\n", recMessageSize,pri);
fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
}
}
}