这是我在https://github.com/open62541/open62541/issues/3275
你好,
我试图了解PubSub的工作方式。我的理解是,发布者将更改发布到OPC UA中间件,订阅发布者的订户将收到OPC UA中间件的通知。我看一下这个例子:https://github.com/open62541/open62541/blob/v1.0/examples/pubsub/tutorial_pubsub_publish.c我们可以看到,在发布者方面,我们需要:
addPubSubConnection(server, transportProfile, networkAddressUrl);
addPublishedDataSet(server);
addDataSetField(server);
addWriterGroup(server);
addDataSetWriter(server);
发布的数据是在addDataSetField内部创建的,在这种情况下是服务器时间,然后服务器时间将在多播网络上连续发布。就我而言,我只想更新单个变量,当满足某些条件时说uint32变量。为此,我如下更改addDataSetField,然后在另一个线程中调用Update()以更新变量。
UA_NodeId published_dataSet_id_, dataSet_field_id_;
static void
addDataSetField(UA_Server *server) {
/* Add a field to the previous created PublishedDataSet */
//UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
//UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
UA_NODEID_NUMERIC(0, UA_NS0ID_UINT32);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, published_dataSet_id_,
&dataSetFieldConfig, &dataSet_field_id_);
}
void Update()
{
UA_Int32 myInteger = 42;
UA_Variant value;
UA_Variant_setScalar(&value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
UA_Server_writeValue(server, dataSet_field_id_, value );
}
我知道
[2019-11-14 14:28:51.467 (UTC+0700)] info/session Connection 0 | SecureChannel 0 | Session g=00000001-0000-0000-0000-000000000000 | WriteRequest returned status code BadNodeClassInvalid
更新变量的正确方法是什么?
我在open62541问题中提供了解决方案https://github.com/open62541/open62541/issues/3275#issuecomment-554454083
================================================ ======================我在回购中的其他一些示例中环顾四周,发现解决方案如下:1.我需要添加一个变量节点
//node id
UA_NodeId gCounterNodePublisher = {1, UA_NODEIDTYPE_NUMERIC, {1234}};
addVariableNode( UA_Server * server )
{
UA_UInt64 publishValue = 0xCAFEDEADCAFEBEAF; //0xCAFE
UA_VariableAttributes publisherAttr = UA_VariableAttributes_default;
UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
publisherAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher Counter");
publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
UA_Server_addVariableNode(server, gCounterNodePublisher,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
UA_QUALIFIEDNAME(1, "Publisher Counter"),
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
publisherAttr, NULL, NULL);
}
在addDataSetField中,我们需要使用相同的节点ID配置数据集
addDataSetField( UA_Server * server )
{
// Add a field to the previous created PublishedDataSet
//UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
//UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
//UA_NODEID_NUMERIC(0, UA_NS0ID_INT32);
gCounterNodePublisher;
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, published_dataSet_id_,
&dataSetFieldConfig, &dataSet_field_id_);
}
//server init and configuration
addVariableNode(server);
addPubSubConnection(server, transportProfile, networkAddressUrl);
addPublishedDataSet(server);
addDataSetField(server);
addWriterGroup(server);
addDataSetWriter(server);
现在,我可以将变量设为已发布数据。3.要更新该值,我的解决方案是我们需要在写入/读取该变量时添加回调函数。 (可能我们有另一种方法)
writeVariableNode( UA_Server * server )
{
//Write a different integer value
static UA_UInt64 myInteger = 0xCAFEDEADCAFEBEAF;
UA_Variant myVar;
UA_Variant_init(&myVar);
myInteger++;
UA_Variant_setScalar(&myVar, &myInteger, &UA_TYPES[UA_TYPES_UINT64]);
UA_Server_writeValue(server, gCounterNodePublisher, myVar);
// Set the status code of the value to an error code. The function
//UA_Server_write provides access to the raw service. The above
//UA_Server_writeValue is syntactic sugar for writing a specific node
//attribute with the write service.
UA_WriteValue wv;
UA_WriteValue_init(&wv);
wv.nodeId = gCounterNodePublisher;
wv.attributeId = UA_ATTRIBUTEID_VALUE;
wv.value.status = UA_STATUSCODE_BADNOTCONNECTED;
wv.value.hasStatus = true;
UA_Server_write(server, &wv);
//Reset the variable to a good statuscode with a value
wv.value.hasStatus = false;
wv.value.value = myVar;
wv.value.hasValue = true;
UA_Server_write(server, &wv);
}
void onReadImpl(UA_Server *server,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeid, void *nodeContext,
const UA_NumericRange *range, const UA_DataValue *data) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"OPCUARuntime::onReadImpl");
// Write a different integer value
static UA_UInt64 myInteger = 0xCAFEDEADCAFEBEAF;
UA_Variant myVar;
UA_Variant_init(&myVar);
myInteger++;
UA_Variant_setScalar(&myVar, &myInteger, &UA_TYPES[UA_TYPES_UINT64]);
UA_Server_writeValue(server, gCounterNodePublisher, myVar);
/// Set the status code of the value to an error code. The function
// UA_Server_write provides access to the raw service. The above
// UA_Server_writeValue is syntactic sugar for writing a specific node
// attribute with the write service.
UA_WriteValue wv;
UA_WriteValue_init(&wv);
wv.nodeId = gCounterNodePublisher;
wv.attributeId = UA_ATTRIBUTEID_VALUE;
wv.value.status = UA_STATUSCODE_BADNOTCONNECTED;
wv.value.hasStatus = true;
UA_Server_write(server, &wv);
//Reset the variable to a good statuscode with a value
wv.value.hasStatus = false;
wv.value.value = myVar;
wv.value.hasValue = true;
UA_Server_write(server, &wv);
}
void onWriteImpl(UA_Server *server,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeid, void *nodeContext,
const UA_NumericRange *range, const UA_DataValue *data) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"OPCUARuntime::onWriteImpl");
}
addValueCallbackToVariableNode( UA_Server *server )
{
UA_ValueCallback callback ;
callback.onRead = onReadImpl;
callback.onWrite = onWriteImpl;
UA_Server_setVariableNode_valueCallback(server, gCounterNodePublisher, callback);
}
主函数中的调用顺序必须是这样的:
//variable node need to be added before pubsub connection is establish
addVariableNode( server_ );
//Register callback on read/write published data
addValueCallbackToVariableNode( server_ );
addPubSubConnection( server_, &transportProfile, &networkAddressUrl);
addPublishedDataSet( server_ );
addDataSetField( server_ );
addWriterGroup( server_ );
addDataSetWriter( server_ );
//addVariableNode( server_ );
//shutdown is taken place inside run once running is false
UA_StatusCode retval = UA_Server_run(server_, &running_);