我有一个问题,也许你可以帮助我。
特定线程中有一个amqp客户端,它与rabbitmq服务器通信。
在客户端开始使用它之前,我需要锁定(并检查它是否已经锁定)某些资源。 LockResource是各种功能的必要前提。
在第一个例子中,我断开连接的最后一个lamba,但是有一个奇怪的行为,do_stuff()有时被调用,有时候不...
在第二个例子中,所有权利,但它杀死多线程。
在第三个,为了避免内存泄漏,我需要更改AmqpClient以在出现错误时始终使用“幻数”发出resourceLocked并在调用do_stuff()之前检查它......这一点都不好......
也许有些事我做错了或者我误解了。如果你有更好的方法,我会接受它。
编辑16/09/2018
我用不精确的解释误导你:do_stuff()不是一种独特的方法,但是我会直接连接各种指令。写[...]而不是do_stuff()会更好。此外,实例中只有一个唯一的客户端。
我不能提前知道用户将首先执行什么。他可以将资源锁定到readResourceContent,deleteResource,writeResourceProperty,...,所以所有这一切都必须执行不同的指令。
好消息是,多亏了你的答案,我有一个使用bool QMetaObject::invokeMethod(QObject *context, Functor function, Qt::ConnectionType type = Qt::AutoConnection, FunctorReturnType *ret = nullptr)
的工作解决方案
一般声明
using Callback = std::function<void()>;
Q_DECLARE_METATYPE(Callback)
qRegisterMetaType<Callback>("Callback");
AmqpClient.cpp
void AmqpClient::lockResource(Identifier identifier, QObject *context, const Callback &func)
{
if(lockedResources_.contains(identifier))
{
QMetaObject::invokeMethod(context,
func,
Qt::QueuedConnection);
return;
}
QString queueName(QString::number(identifier) + ".lock");
QAmqpQueue *lockQueue = client_->createQueue(queueName);
connect(lockQueue, qOverload<QAMQP::Error>(&QAmqpQueue::error), this, [this](QAMQP::Error error) {
if(error == QAMQP::ResourceLockedError) {
emit errorMessage("The expected resource is already locked by another user.");
sender()->deleteLater();
}
});
connect(lockQueue, &QAmqpQueue::declared, this, [=]() {
QAmqpQueue *lockQueue = qobject_cast<QAmqpQueue*>(sender());
lockQueue->consume(QAmqpQueue::coExclusive);
lockedResources_[identifier] = lockQueue;
QMetaObject::invokeMethod(context,
func,
Qt::QueuedConnection);
});
lockQueue->declare(QAmqpQueue::Exclusive | QAmqpQueue::AutoDelete);
}
Controller.cpp
void Controller::readResourceContent(int row)
{
[...]
QMetaObject::invokeMethod(amqp_,
"lockResource",
Qt::AutoConnection,
Q_ARG(Identifier, identifier),
Q_ARG(QObject*, this),
Q_ARG(Callback, [&](){ [...] }));
[...]
}
1
// called not inside connect(...) because it may not to emit the signal
// (if resource is already locked)
disconnect(amqp_, &AmqpClient::resourceLocked, 0, 0);
connect(amqp_, &AmqpClient::resourceLocked, this, [&](){
do_stuff();
});
emit lockResource(identifier, QPrivateSignal());
2
// This is working like a charm, but I'm losing ui reactivity
QEventLoop loop;
connect(amqp_, &AmqpClient::resourceLocked, &loop, &QEventLoop::quit);
emit lockResource(identifier, QPrivateSignal());
loop.exec();
do_stuff();
3
// Using an intermediate object
class CallbackObject : public QObject
{
Q_OBJECT
std::function<void()> callback;
public:
CallbackObject(std::function<void()> callback) : QObject(), callback(callback) {}
public slots:
void execute() { callback(); deleteLater(); }
};
// Working but memory leak if signal is not emitted
// resource already locked for example
CallbackObject *helper = new CallbackObject([&](){
do_stuff() ;
});
connect(amqp_, &AmqpClient::resourceLocked, helper, &CallbackObject::execute);
emit lockResource(identifier, QPrivateSignal());
如果我理解正确,您正在尝试设置以下事件序列:
Client AMQPClient
| lockResource(id) |
|------------------------------>|
| |
| |--\
| | |
| resourceLocked | | try to acquire resource
|<------------------------------| |
/--| |<-/
do_stuff | | |
\->| |
| |
当resourceLocked
无法获取资源时,不发送消息AMQP
,因此在这种情况下也不会调用do_stuff
。
当您有多个客户端等待同时锁定其各自的资源时,使用实现1时未调用do_stuff
的问题是竞争条件。一个序列演示了您描述的问题(以及该方法的其他问题以及您的实现3)如下:
AmqpClient::resourceLocked
的所有现有连接,然后将自身连接到该信号。AmpqClient::resourceLocked
的所有现有连接,特别是客户端A刚建立的连接。然后将自身连接到该信号。AmpqClient
处理客户A的lockResource
请求。它获取了所请求的资源并发出resourceLocked
信号。do_stuff
,即使它是被获取的客户端A请求的资源。AmpqClient
处理客户B的lockResource
请求。同样,它成功获取资源并发出resourceLocked
信号。do_stuff
。上面的序列表明,客户端A的do_stuff
没有被调用,考虑到客户端B在其资源尚未被获取的情况下工作。
要解决此问题,您必须确保只调用刚刚处理了do_stuff
请求的客户端的lockResource
。信号 - 通过设计总是通知所有观察者 - 只是一种次优的方法,因为观察者需要检查信号是否意在通知他们或其他人。
第一个修复是修改lockResource
信号以发送发出信号的客户端的this
指针。这样,AmqpClient
可以使用该指针在获取资源时回调客户端。
然后,连接到信号的插槽的一个实现可能如下所示:
void AmqpClient::handleLockResourceRequest(int identifier,
QObject* requestingClient)
{
// try to acquire resource described by `identifier`
if (resource_acquired_successfully)
{
QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
}
}
请注意,为了使invokeMethod
工作,do_stuff
必须是一个槽或需要标记为Q_INVOKABLE
。
但我更进一步,只需通过do_stuff
调用invokeMethod
:据我所知,AmqpClient
是唯一连接到lockResource
的侦听器,并且是同一线程中的两个对象,您可能只是使用直接调用amqp_->tryToLockResource(identifier)
而不是emit lockResource(identifier)
。即使用该信号的唯一原因是呼叫通过事件循环。
您也可以使用invokeMethod
通过事件循环请求获取资源,而不是使用信号。这样做的好处是,您的客户端类不再暴露这样一个事实,即它需要获取资源来为该类的用户完成工作。
总而言之,生成的代码看起来像这样:
class ClientType : public QObject {
Q_OBJECT
public:
// ...
Q_INVOKABLE void do_stuff(); // definition as before
private:
void requestResource(); // was previously code block 1 in your question
private:
AmqpClient* amqp_;
// ...
};
inline void ClientType::requestResource()
{
auto identifier = ...; // create resource identifier
QMetaObject::invokeMethod(amqp_,
"requestResource",
Qt::AutoConnection,
Q_ARG(int, identifier),
Q_ARG(QObject*, this));
}
class AmqpClient : public QObject {
Q_OBJECT
public:
// ...
Q_INVOKABLE requestResource(int identifier, QObject* requestingClient);
};
inline void AmqpClient::requestResource(int identifier,
QObject* requestingClient)
{
// try to acquire resource described by `identifier`
if (resource_acquired_successfully)
{
QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
}
}
在上面的实现中,我假设资源标识符的类型为int
,但当然您可以使用在Qt元类型系统中注册的任何其他类型。类似地,在使用Qts元类型系统注册指针类型之后,您也可以使用QObject
(或抽象基类来避免引入循环依赖),而不是传递ClientType*
指针。