Files
snmp2mqtt/snmp2mqtt.py
Antoine Van Elstraete 7888999ec3 feat: Ajoute la transformation de valeur par opération mathématique
Cette modification introduit la possibilité d'appliquer une opération mathématique simple (addition, soustraction, multiplication, division) sur les valeurs SNMP récupérées.

L'utilisateur peut désormais définir une clé 'operation' dans la configuration d'un OID (par exemple, 'value / 1000') pour normaliser les données avant leur envoi à MQTT. La documentation dans `config.yaml` a été mise à jour avec un exemple.
2025-11-06 17:05:20 +01:00

467 lines
17 KiB
Python
Executable File

#!/bin/env python3
import asyncio
from pysnmp.hlapi.asyncio import (
get_cmd, CommunityData, UdpTransportTarget, ContextData,
SnmpEngine, ObjectIdentity, ObjectType
)
import logging
import random
from paho.mqtt import client as mqtt_client
import json
from time import sleep
import yaml
import argparse
import sys
import os
import threading
import signal
import time
logging.basicConfig(
format='(%(levelname)s) [%(threadName)s] %(message)s',
level=logging.INFO
)
# Global shutdown flag
shutdown_event = threading.Event()
class DeviceMonitorThread(threading.Thread):
"""Thread class for monitoring a single device"""
def __init__(self, device_name, device_config, mqtt_config, sleep_interval=2):
super().__init__(name=f"Device-{device_name}")
self.device_name = device_name
self.device_config = device_config
self.mqtt_config = mqtt_config.copy()
self.sleep_interval = sleep_interval
self.daemon = True # Dies when main thread dies
# Create unique client ID for this device
self.mqtt_config['client_id'] = f"snmp-mqtt-{device_name}-{random.randint(0, 1000)}"
# Create device request object
self.req = {
"device_name": device_name,
"ip": device_config["ip"],
"snmp_community": device_config["snmp_community"],
"oids": device_config["oids"]
}
def run(self):
"""Main thread execution"""
logging.info(f"Starting monitoring thread for device: {self.device_name} ({self.device_config['ip']})")
try:
# Setup MQTT connection
client = connect_mqtt(self.mqtt_config)
client.loop_start()
state_topic = f"SNMP/{self.device_name}/state"
availability_topic = f"SNMP/{self.device_name}/availability"
logging.info(f"[{self.device_name}] MQTT client connected")
# Publish Home Assistant autodiscovery configuration (only once on startup)
publish_ha_autodiscovery_config(client, self.req)
# Mark device as available
publish(availability_topic, client, "online", True, 1)
logging.info(f"[{self.device_name}] Starting monitoring loop")
# Main monitoring loop
while not shutdown_event.is_set():
try:
# Get SNMP data and publish state
state = asyncio.run(get_snmp(self.req))
publish(state_topic, client, state, False, 0)
logging.debug(f"[{self.device_name}] Published state to {state_topic}: {json.dumps(state)}")
# Update availability (heartbeat)
publish(availability_topic, client, "online", False, 1)
except Exception as e:
logging.error(f"[{self.device_name}] Error in monitoring loop: {e}")
# Mark as offline on error
publish(availability_topic, client, "offline", False, 1)
# Wait for next iteration or shutdown signal
shutdown_event.wait(timeout=self.sleep_interval)
# Cleanup - mark device as offline
publish(availability_topic, client, "offline", True, 1)
client.loop_stop()
client.disconnect()
logging.info(f"[{self.device_name}] Monitoring thread stopped gracefully")
except Exception as e:
logging.error(f"[{self.device_name}] Fatal error in monitoring thread: {e}")
# Try to mark as offline on fatal error
try:
publish(availability_topic, client, "offline", True, 1)
client.disconnect()
except:
pass
logging.info(f"[{self.device_name}] Thread {self.name} finished")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant')
parser.add_argument('--config', '-c', required=True,
help='Path to YAML configuration file')
return parser.parse_args()
def load_config(config_path):
"""Load and validate YAML configuration file"""
if not os.path.exists(config_path):
logging.error(f"Configuration file not found: {config_path}")
sys.exit(1)
try:
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
except yaml.YAMLError as e:
logging.error(f"Error parsing YAML configuration: {e}")
sys.exit(1)
except Exception as e:
logging.error(f"Error reading configuration file: {e}")
sys.exit(1)
# Validate required configuration sections
if 'mqtt' not in config:
logging.error("Missing 'mqtt' section in configuration")
sys.exit(1)
if 'devices' not in config:
logging.error("Missing 'devices' section in configuration")
sys.exit(1)
# Validate MQTT configuration
required_mqtt_fields = ['broker', 'port', 'user', 'password']
for field in required_mqtt_fields:
if field not in config['mqtt']:
logging.error(f"Missing required MQTT field: {field}")
sys.exit(1)
# Validate device configurations
for device_name, device_config in config['devices'].items():
required_device_fields = ['ip', 'snmp_community', 'oids']
for field in required_device_fields:
if field not in device_config:
logging.error(f"Missing required field '{field}' in device '{device_name}'")
sys.exit(1)
# Validate OID configurations
for oid in device_config['oids']:
required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform']
for field in required_oid_fields:
if field not in oid:
logging.error(f"Missing required OID field '{field}' in device '{device_name}'")
sys.exit(1)
# Convert type string to actual Python type
if oid['type'] == 'int':
oid['type'] = int
elif oid['type'] == 'bool':
oid['type'] = bool
elif oid['type'] == 'str':
oid['type'] = str
else:
logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'")
sys.exit(1)
logging.info(f"Configuration loaded successfully from {config_path}")
return config
def connect_mqtt(mqtt_config):
def on_connect(client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.error(f"Failed to connect to MQTT Broker, reason code: {reason_code}")
# Use the new callback API version 2
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
client.username_pw_set(mqtt_config["user"], mqtt_config["password"])
client.on_connect = on_connect
client.connect(mqtt_config['broker'], mqtt_config['port'])
return client
def publish(topic, client, data, retain, qos):
if isinstance(data, str):
msg = data
else:
msg = json.dumps(data)
result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain))
status = result[0]
if status == 0:
logging.debug(f"Send `{msg}` to topic `{topic}`")
else:
logging.error(f"Failed to send message to topic {topic}")
def apply_operation(value, operation_str):
"""
Applies a simple mathematical operation to a value.
e.g., operation_str = "value / 1000"
"""
if 'value' not in operation_str:
logging.error(f"Invalid operation string: 'value' placeholder missing in '{operation_str}'")
return value
expression = operation_str.replace('value', str(value))
try:
parts = expression.split()
if len(parts) != 3:
logging.error(f"Invalid operation format: '{operation_str}'. Expected 'value <operator> <operand>'")
return value
val = float(parts[0])
operator = parts[1]
operand = float(parts[2])
if operator == '+':
return val + operand
elif operator == '-':
return val - operand
elif operator == '*':
return val * operand
elif operator == '/':
if operand == 0:
logging.warning(f"Attempted division by zero in operation: {operation_str}")
return value
return val / operand
else:
logging.error(f"Unsupported operator: '{operator}' in '{operation_str}'")
return value
except (ValueError, IndexError) as e:
logging.error(f"Could not parse operation string: '{operation_str}'. Error: {e}")
return value
async def get_snmp(req):
"""Asynchronously retrieve SNMP data from device using new pysnmp API"""
data = {}
# Create SNMP engine and transport target
snmpEngine = SnmpEngine()
authData = CommunityData(req["snmp_community"])
transportTarget = await UdpTransportTarget.create((req["ip"], 161))
contextData = ContextData()
for oid in req["oids"]:
try:
# Perform async SNMP GET operation
errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
snmpEngine,
authData,
transportTarget,
contextData,
ObjectType(ObjectIdentity(oid["oid"]))
)
if errorIndication:
logging.error(f"{req['device_name']} SNMP error indication: {errorIndication}")
continue
elif errorStatus:
logging.error(
f"{req['device_name']} SNMP error status: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}"
)
continue
else:
for varBind in varBinds:
logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}")
# Cast to the right type
value = oid['type'](varBind[1])
# Apply operation if defined
if 'operation' in oid:
value = apply_operation(value, oid['operation'])
if oid['type'] == bool:
if bool(value):
data.update({oid["name"]: "ON"})
else:
data.update({oid["name"]: "OFF"})
else:
data.update({oid["name"]: value})
except Exception as e:
logging.error(f"{req['device_name']} Exception getting OID {oid['oid']}: {e}")
continue
logging.debug(f"{req['device_name']} JSON : {json.dumps(data)}")
return data
def create_ha_device_info(req):
"""Create device information for Home Assistant MQTT Discovery"""
return {
"identifiers": [f"snmp2mqtt_{req['device_name']}_{req['ip']}".replace(".", "_")],
"name": req['device_name'],
"model": "SNMP Device",
"manufacturer": "Network Equipment",
"via_device": "snmp2mqtt"
}
def create_ha_sensor_config(req, oid):
"""Create Home Assistant MQTT Discovery configuration for a single sensor"""
device_info = create_ha_device_info(req)
sensor_id = f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_")
config = {
"name": f"{req['device_name']} {oid['name']}",
"unique_id": sensor_id,
"state_topic": f"SNMP/{req['device_name']}/state",
"value_template": f"{{{{ value_json.{oid['name']} }}}}",
"device": device_info,
"origin": {
"name": "snmp2mqtt",
"sw_version": "1.0.0",
"support_url": "https://git.antoineve.me/AntoineVe/snmp2mqtt"
}
}
# Add device class if specified
if 'HA_device_class' in oid:
config['device_class'] = oid['HA_device_class']
# Add state_class for total_increasing counters like data size
if oid['HA_device_class'] == 'data_size':
config['state_class'] = 'total_increasing'
# Add unit of measurement if specified
if 'HA_unit' in oid:
config['unit_of_measurement'] = oid['HA_unit']
# Add icon based on device class
icon_mapping = {
'data_size': 'mdi:network',
'connectivity': 'mdi:network-outline',
'power_factor': 'mdi:gauge',
'temperature': 'mdi:thermometer',
'signal_strength': 'mdi:signal'
}
if 'HA_device_class' in oid and oid['HA_device_class'] in icon_mapping:
config['icon'] = icon_mapping[oid['HA_device_class']]
# Add availability topic
config['availability'] = {
"topic": f"SNMP/{req['device_name']}/availability",
"payload_available": "online",
"payload_not_available": "offline"
}
return config
def get_ha_discovery_topic(req, oid):
"""Get the correct Home Assistant MQTT Discovery topic for a sensor"""
platform = oid['HA_platform'] # 'sensor' or 'binary_sensor'
node_id = req['device_name']
object_id = f"{req['device_name']}_{oid['name']}".replace(".", "_")
# Format: homeassistant/<platform>/<node_id>/<object_id>/config
return f"homeassistant/{platform}/{node_id}/{object_id}/config"
def publish_ha_autodiscovery_config(client, req):
"""Publish Home Assistant MQTT Discovery configuration for all sensors of a device"""
logging.info(f"[{req['device_name']}] Publishing Home Assistant autodiscovery configuration")
# Publish availability as online
availability_topic = f"SNMP/{req['device_name']}/availability"
publish(availability_topic, client, "online", True, 1)
# Publish discovery configuration for each OID/sensor
for oid in req['oids']:
config = create_ha_sensor_config(req, oid)
topic = get_ha_discovery_topic(req, oid)
# Publish with retain=True so HA discovers it after restarts
publish(topic, client, config, True, 1)
logging.info(f"[{req['device_name']}] Published discovery config for {oid['name']} to {topic}")
# Small delay to avoid overwhelming the broker
time.sleep(0.1)
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
shutdown_event.set()
def process_devices(config):
"""Process multiple devices using threading"""
mqtt_config = config['mqtt'].copy()
sleep_interval = config.get('sleep_interval', 2)
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
device_threads = []
try:
logging.info(f"Starting monitoring for {len(config['devices'])} device(s)")
# Create and start a thread for each device
for device_name, device_config in config['devices'].items():
thread = DeviceMonitorThread(
device_name=device_name,
device_config=device_config,
mqtt_config=mqtt_config,
sleep_interval=sleep_interval
)
device_threads.append(thread)
thread.start()
logging.info(f"Started thread for device: {device_name}")
# Wait for all threads to complete or shutdown signal
while any(thread.is_alive() for thread in device_threads) and not shutdown_event.is_set():
time.sleep(0.5)
except Exception as e:
logging.error(f"Error in process_devices: {e}")
shutdown_event.set()
# Wait for all threads to finish
logging.info("Waiting for all monitoring threads to finish...")
for thread in device_threads:
if thread.is_alive():
thread.join(timeout=5.0) # Wait max 5 seconds per thread
if thread.is_alive():
logging.warning(f"Thread {thread.name} did not stop gracefully")
logging.info("All monitoring threads have finished")
def main():
"""Main entry point"""
args = parse_arguments()
config = load_config(args.config)
logging.info("Starting snmp2mqtt bridge...")
logging.info(f"Configured devices: {list(config['devices'].keys())}")
try:
process_devices(config)
except KeyboardInterrupt:
logging.info("Shutdown requested by user")
except Exception as e:
logging.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()