创建自定义 MiB(使用自定义 OID)以从 Python 脚本接收数据

问题描述 投票:0回答:1

我对这种类型的领域相当陌生,想询问建议,以及是否有任何资源适合涉足完成这项任务(我已经研究了一周,目前正陷入这个困境)部分)

我的任务是抓取数据并对其进行处理,一旦准备好将其保存到数据库中,似乎很容易。但我遇到麻烦的另一项任务是将我必须的数据也发送到 SNMP,因为将有另一个应用程序通过 SNMP 等待该数据

我搜索了互联网、书籍和教程,发现我必须将其发送到SNMP服务中的OID,我设法发送数据并查看传输到SNMP中OID的数据

但是,我只设法将其发送到一个已经存在的 OID,即 SysContact 的 OID (.1.3.6.1.2.1.1.4.0),所以我已经具备了发送和传输数据的基本知识。

我的问题以及让我陷入困境 1 周的问题是如何在 SNMP 中创建我自己的文件夹和自己的变量。 (我还设法了解,我需要创建自己的 MiB 才能拥有自己的自定义 MiB,但我设法获得的资源太复杂,我无法理解如何创建一个 MiB 的一两件事,另外其中一些已经过时了,可能是因为我对这个系统配置还很陌生)

已经成功在 Linux 和 Windows 中创建 SNMP 服务器(但它是在 .sh 脚本中,我只了解使用了 pass 协议的部分)

我更像是一个Python程序员

python windows snmp pysnmp mib
1个回答
0
投票

如果您只想要一些简单的示例,那么现在 PySNMP 单元测试用例涵盖了这一点,

  • 首先您需要在代理端注册自定义对象,例如this
        # --- create custom Managed Object Instance ---
        mibBuilder = snmpContext.getMibInstrum().getMibBuilder()

        MibScalar, MibScalarInstance = mibBuilder.importSymbols(
            "SNMPv2-SMI", "MibScalar", "MibScalarInstance"
        )

        class MyStaticMibScalarInstance(MibScalarInstance):
            # noinspection PyUnusedLocal,PyUnusedLocal
            def getValue(self, name, idx):
                time.sleep(2)  # Add a 2-second sleep
                return self.getSyntax().clone(f"Test agent")

            def setValue(self, name, idx, value):
                # Here you can handle the SET operation.
                # `value` is the new value for the scalar instance.
                # You should store this value somewhere, and return it in the `getValue` method.
                # For this example, let's just print it.
                print(f"SET operation received. New value: {value}")
                return self.getSyntax().clone(value)

        mibBuilder.exportSymbols(
            "__MY_MIB",
            MibScalar((1, 3, 6, 1, 4, 1, 60069, 9, 1), v2c.OctetString()),
            MyStaticMibScalarInstance(
                (1, 3, 6, 1, 4, 1, 60069, 9, 1), (0,), v2c.OctetString()
            ),
        )

        # --- end of Managed Object Instance initialization ----
  • 然后您可以在管理器端使用对象,例如this
        snmpEngine = SnmpEngine()

        async def run_get():
            errorIndication, errorStatus, errorIndex, varBinds = await getCmd(
                snmpEngine,
                CommunityData("public", mpModel=0),
                UdpTransportTarget(("localhost", AGENT_PORT), timeout=1, retries=0),
                ContextData(),
                ObjectType(ObjectIdentity("1.3.6.1.4.1.60069.9.1.0")),
            )
            for varBind in varBinds:
                print([str(varBind[0]), varBind[1]])

        asyncio.run(run_get())
        snmpEngine.transportDispatcher.closeDispatcher()
© www.soinside.com 2019 - 2024. All rights reserved.