无法接收通过POSIX消息队列发送的结构

问题描述 投票:0回答:1

我有两个与消息队列通信的进程。每个进程都是其自己的单独文件。发件人似乎已成功将消息发送到消息队列中。不幸的是,接收进程阻塞并且没有观察到通过posix消息队列发送的消息。

侧面注:我成功地使用字符串实现了此目的(使用sscanf解析接收到的消息时,使用sprintf在发送方中构造消息)。因此,我可以确认MQ设置正确。

话虽如此,我跟随this post更改了我的字符串消息队列实现并适应了结构。

这里是结构和一些想要通过消息队列传递的信息。

typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        char* shm_name;
        size_t shm_size;
        char* path;
    }msg_info;
}msg_buffer;

在这两个过程中,消息队列都是打开的。

struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MSG;  // 10
attr.mq_msgsize = BUFSIZE;    // 1200
attr.mq_curmsgs = 0;

// sender and receiver both open mq like below
mq = mq_open(MQ_NAME, O_RDWR | O_CREAT, 0666, &attr);

发送者通过这种方式创建结构,并通过MQ发送它。

ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
    // Note: char* path sent looks like "/to/some/path.txt";
    //       char* shm_name_buffer[20] = "shm_name_00";

    char* path_copy = malloc(strlen(path) + 1);
    char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
    // TODO: fix memory leaks by these char* being copied
    strcpy(path_copy, path);
    strcpy(shm_name_copy, shm_name_buffer);

    struct msg_buffer message = {2, { path_copy, segment_size, shm_name_copy} };

    if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0){
        fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
    }

    fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
    fprintf(stdout, "path:%s\n", message.msg_info.path);
    fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
    fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);

    mq_getattr(mq, &setAttr);
    fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
    fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);

    return bytes_transferred;
}

这是接收者,它将监听并等待队列中的消息并尝试解析信息:

void *threadCacheProcess(void *thread_id){
    int recMessageSize;
    msg_buffer message;

    while(1){
        // recv message sent from proxy and parse
        recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_info), NULL);

        // block until we get valid message
        if(recMessageSize != -1){
            fprintf(stdout, "Success!\n");
            fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
            fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
            fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
        }
    }
}

控制台输出:

发件人控制台:

path:/test/path/to/file.txt
shm_name:/shm_name_0
shm_size:8192
Msgs In Queue: 1
Msg Size: 1219

接收器控制台:

Thread Running...   // No "Success or print message" that was expected. Why?

编辑:我已经更新了代码,因此,如果遵循Beej的指南,并带有嵌套示例。不幸的是,接收问题仍然存在。

c struct posix message-queue mq
1个回答
0
投票

看看您的消息缓冲区的定义。您定义消息(msg_info)的内容以包含两个pointers,shm_name和path。那些指针指向发送者进程空间中的内存,并且是不是接收者地址空间的一部分

typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        char* shm_name;
        size_t shm_size;
        char* path;
    }msg_info;
}msg_buffer;

我的建议是阅读有关Sun-RPC和XDR的信息-您需要序列化发送方中的数据,并反序列化接收方中的数据。了解有关数据序列化或编组的信息。

如何解决?确定您的shm_name和路径的最大空间,然后重新定义您的消息以容纳它们。

#define MAX_SHM_NAME (32) //pick appropriate values...
#define MAX_PATH (128)
typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        size_t shm_size;
        char shm_name[MAX_SHM_NAME];
        char path[MAX_PATH];
    }msg_info;
}msg_buffer;

然后复制消息序列化功能中的shm_name和路径值,

ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
// Note: char* path sent looks like "/to/some/path.txt";
//       char* shm_name_buffer[20] = "shm_name_00";

/* why malloc? malloc allocs memory in local process address space, the other process cannot see it...
char* path_copy = malloc(strlen(path) + 1);
char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
// TODO: fix memory leaks by these char* being copied
strcpy(path_copy, path);
strcpy(shm_name_copy, shm_name_buffer);
*/

struct msg_buffer message;
message.mtype = 2;
message.msg_info.thing = segment_size;
//you could omit the null-terminators, since you have defined max length
strncpy(message.msg_info.shm_name, shm_name_buffer, MAX_SHM_NAME-1);
    message.msg_info.shm_name[MAX_SHM_NAME-1] = '\0';
strncpy(message.msg_info.path, path, MAX_PATH-1);
    message.msg_info.path[MAX_PATH-1] = '\0';

//wrong - gotta send whole message...
//if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0)
if(mq_send(mq,(const char*) &message, sizeof(struct msg_buffer), 0) < 0){
    fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
}

fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
fprintf(stdout, "path:%s\n", message.msg_info.path);
fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);

mq_getattr(mq, &setAttr);
fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);

return bytes_transferred;
}

您的接收者必须以相同的方式提取数据。最好的方法是将数据提取到本地缓冲区中。

void *threadCacheProcess(void *thread_id){
ssize_t recMessageSize;
msg_buffer message;
unsigned int pri=0; //message priority

while(1){
    // recv message sent from proxy and parse
    recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_buffer), &pri);

    if( recMessageSize < 0 ) {
        int bad = errno;
        fprintf(stdout, "error: %d:\n", bad); perror(bad);
    }
    // block until we get valid message
    else {
        fprintf(stdout, "size %ld, priority %d\n", recMessageSize,pri);
        fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
        fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
        fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
    }
}
}
© www.soinside.com 2019 - 2024. All rights reserved.