我是 OPC UA 和 Python 的新手,但通过 asyncua 示例,我创建了实际项目所需的示例。现在我需要为服务器和客户端添加安全性,目前使用用户名和密码就可以了。
这是我的功能代码,没有安全性。如果有人知道我需要使用什么功能来创建两个用户,一个具有管理员权限,另一个没有管理员权限,请告诉我。
import asyncio
from asyncua import Server, ua
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_struct_field
@uamethod
def func(parent, value):
return value * 2
async def main():
server = Server()
await server.init()
server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await server.nodes.objects.add_method(
ua.NodeId("ServerMethod", idx),
ua.QualifiedName("ServerMethod", idx),
func,
[ua.VariantType.Int64],
[ua.VariantType.Int64],
)
struct = await new_struct(server, idx, "MyStruct", [
new_struct_field("FirstValue", ua.VariantType.Float, 0.0),
new_struct_field("SecondValue", ua.VariantType.Float, 0.0),
new_struct_field("ThirdValue", ua.VariantType.Float, 0.0),
new_struct_field("FourthValue", ua.VariantType.Float, 0.0),
new_struct_field("FifthValue", ua.VariantType.Float, 0.0),
])
custom_objs = await server.load_data_type_definitions()
mystruct = await myobj.add_variable(idx, "my_struct", ua.Variant(ua.MyStruct(), ua.VariantType.ExtensionObject))
await mystruct.set_writable()
await myvar.set_writable()
print("Starting server!")
async with server:
while True:
await asyncio.sleep(0.5)
n_struct = await mystruct.get_value()
var = await myvar.read_value()
print ("\n%f\n%f\n%f\n%f\n%f\n%f" % (var, n_struct.FirstValue, n_struct.SecondValue, n_struct.ThirdValue, n_struct.FourthValue, n_struct.FifthValue))
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print('Async event loop already running. Adding coroutine to the event loop.')
tsk = loop.create_task(main())
tsk.add_done_callback(
lambda t: print(f'Task done with result={t.result()} << return val of main()'))
else:
print('Starting new event loop')
result = asyncio.run(main(), debug=True)
我尝试使用 Asyncua 的加密示例,但无法使其工作。所以,我尝试从Asyncua服务器读取主代码的功能并自己做一些事情,但我只得到了错误。
谢谢你。
你必须创建一个实现 get_user 的类
class CustomUserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
if username == "admin":
if password == 'secret_admin_pw':
return User(role=UserRole.Admin)
elif username == "user":
if password == 'secret_pw':
return User(role=UserRole.User)
return None
要在代码中使用管理器:
user_manager = CustomUserManager()
server = Server(user_manager=cert_user_manager)
await server.init()
server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
好的,所以出于某种原因,如果我使用:
async with Client(url=url) as client:
要作为客户端连接到网址,连接会自动启动,并且我无法设置用户名,所以我所做的是:
client = Client(url=url)
client.set_user('admin')
client.set_password('secret_admin_pw')
await client.connect()
现在可以了。
你能展示一下你使用的代码吗,因为我也遇到了同样的问题。唯一的区别是我不在 python 上运行服务器,而是在 PLCnext PLC 上运行服务器。但通过 UAexpert 我可以使用用户名和密码登录。