Fix SNMP checker for pysnmp 7.x API
Migrate from deprecated hlapi to v1arch.asyncio with Slim wrapper. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,10 @@
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from pysnmp.hlapi import (
|
||||
SnmpEngine,
|
||||
CommunityData,
|
||||
UdpTransportTarget,
|
||||
ContextData,
|
||||
ObjectType,
|
||||
from pysnmp.hlapi.v1arch.asyncio import (
|
||||
Slim,
|
||||
ObjectIdentity,
|
||||
getCmd,
|
||||
ObjectType,
|
||||
)
|
||||
|
||||
from .base import BaseChecker, CheckResult
|
||||
@@ -15,6 +12,9 @@ from .base import BaseChecker, CheckResult
|
||||
|
||||
class SnmpChecker(BaseChecker):
|
||||
def check(self) -> CheckResult:
|
||||
return asyncio.run(self._async_check())
|
||||
|
||||
async def _async_check(self) -> CheckResult:
|
||||
host = self.config["host"]
|
||||
port = self.config.get("port", 161)
|
||||
community = self.config.get("community", "public")
|
||||
@@ -23,15 +23,16 @@ class SnmpChecker(BaseChecker):
|
||||
|
||||
start = time.time()
|
||||
try:
|
||||
iterator = getCmd(
|
||||
SnmpEngine(),
|
||||
CommunityData(community),
|
||||
UdpTransportTarget((host, port), timeout=timeout_val, retries=1),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity(oid))
|
||||
with Slim() as slim:
|
||||
error_indication, error_status, error_index, var_binds = await slim.get(
|
||||
community,
|
||||
host,
|
||||
port,
|
||||
ObjectType(ObjectIdentity(oid)),
|
||||
timeout=timeout_val,
|
||||
retries=1
|
||||
)
|
||||
|
||||
error_indication, error_status, error_index, var_binds = next(iterator)
|
||||
response_time = (time.time() - start) * 1000 # ms
|
||||
|
||||
if error_indication:
|
||||
|
||||
Reference in New Issue
Block a user