在一个线程中修改成员变量并在另一个线程中读取

问题描述 投票:0回答:1

我有两个成员变量。

class MessageQueues {
    ....
    char* m_InputBuffer;
    uint32_t m_InputBufferSize;
    ....
};

我在当前线程的函数中更新它们,甚至用调试器查看成员变量是否更新。但是在另一个函数中,在一个单独的线程中运行,成员变量包含垃圾。

所以,我有一个函数,它读取两个值并将它们写入两个成员变量并在完成时触发另一个线程。

ErrorCode MessageQueue::handleIncomingMessage(char receiveBuffer[], const uint32_t bufferSize) {
    ES_TRC3("started");
    ErrorCode errorCode = ES_SUCCESS;
    pthread_mutex_lock(&mutexMsgQueueIncoming);
    m_InputBuffer = receiveBuffer;
    m_InputBufferSize = bufferSize;
    ES_TRC3("triggering and unlocking");
    pthread_cond_signal(&msgQueueCondition);
    pthread_mutex_unlock(&mutexMsgQueueIncoming);
    ES_TRC3("triggered and unlocked");
    return errorCode;
}

然后在第二个成员函数中,它无限期地在一个单独的线程中运行并等待消息,

ErrorCode MessageQueue::runReceiver(void) {
    ES_TRC3("started");
    ErrorCode errorCode = ES_SUCCESS;
    while(true)
    {
        ES_TRC3("waiting for input messages");
        pthread_cond_wait(&msgQueueCondition, &mutexMsgQueueIncoming);
        pthread_mutex_lock(&mutexMsgQueueIncoming);
        ES_TRC3("will parse message");
        if (strlen(m_InputBuffer) > 0UL) {
            if ((errorCode = m_MsgProtocol->parseMessage(m_InputBuffer, m_InputBufferSize)) != ES_SUCCESS)
            {
                ES_TRC1("failed to parseMessage, error:%d", errorCode);
            }
        }
        ES_TRC3("message parsed");
        pthread_mutex_unlock(&mutexMsgQueueIncoming);
    }
    return errorCode;
}

使用调试器(以及日志消息),我可以看到变量的更新和触发以正确的顺序发生。所以我无法弄清楚为什么在第二个函数中,成员变量似乎不包含正确的值。我在某处读到了一些关于“易变”的东西。但不要真的认为,这是正确的答案。我在这里做错了吗?

更新:互斥体在类之外,但在同一个源文件中,如上所示在顶部初始化。

pthread_mutex_t mutexMsgQueueIncoming       = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t msgQueueCondition            = PTHREAD_COND_INITIALIZER;

实际上,这很奇怪。当我在没有调试器的情况下运行程序时,似乎无法在runReceiver()函数中锁定互斥锁。使用调试器,互斥锁被锁定,但成员变量包含垃圾。好的,我想我应该调查一下。

c++ pthreads
1个回答
1
投票

这里有几个错误:

  1. 必须在互斥锁被锁定时等待条件变量。 pthread_cond_wait文档是必读的。
  2. 条件变量spurious wake-ups必须由while循环处理。

修正:

class MessageQueues {
    ....
    char* m_InputBuffer;
    uint32_t m_InputBufferSize;
    uint32_t m_WriteGeneration = 0; // <--- a fix.
    uint32_t m_ReadGeneration = 0;  // <--- a fix.
    ....
};

ErrorCode MessageQueue::handleIncomingMessage(char receiveBuffer[], const uint32_t bufferSize) {
    ES_TRC3("started");
    ErrorCode errorCode = ES_SUCCESS;
    pthread_mutex_lock(&mutexMsgQueueIncoming);
    m_InputBuffer = receiveBuffer;
    m_InputBufferSize = bufferSize;
    ++m_WriteGeneration; // <--- a fix.
    ES_TRC3("triggering and unlocking");
    pthread_cond_signal(&msgQueueCondition);
    pthread_mutex_unlock(&mutexMsgQueueIncoming);
    ES_TRC3("triggered and unlocked");
    return errorCode;
}

ErrorCode MessageQueue::runReceiver(void) {
    ES_TRC3("started");
    ErrorCode errorCode = ES_SUCCESS;
    while(true)
    {
        ES_TRC3("waiting for input messages");
        pthread_mutex_lock(&mutexMsgQueueIncoming);  // <--- a fix.
        while(m_ReadGeneration == m_WriteGeneration) // <--- a fix.
            pthread_cond_wait(&msgQueueCondition, &mutexMsgQueueIncoming); // <--- a fix.
        m_ReadGeneration = m_WriteGeneration; // <--- a fix.
        ES_TRC3("will parse message");
        if (strlen(m_InputBuffer) > 0UL) {
            if ((errorCode = m_MsgProtocol->parseMessage(m_InputBuffer, m_InputBufferSize)) != ES_SUCCESS)
            {
                ES_TRC1("failed to parseMessage, error:%d", errorCode);
            }
        }
        ES_TRC3("message parsed");
        pthread_mutex_unlock(&mutexMsgQueueIncoming);
    }
    return errorCode;
}

请注意,如果在handleIncomingMessage解锁互斥锁时runReceiver被调用两次,则此代码将丢失该消息。您可能需要一个消息队列,而不是存储最后一条消息(即您有一个大小为1的队列,在溢出时丢弃旧元素)。

代码还需要检查pthread函数返回的错误代码。这是相当繁琐的,更好地使用C ++ 11 std::mutexstd::condition_variablestd::unique_lock为你做检查。在C ++ 98中,您可以使用boost等效项。

© www.soinside.com 2019 - 2024. All rights reserved.