#!/bin/env python3 import asyncio from pysnmp.hlapi.asyncio.slim import Slim from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType import logging import random from paho.mqtt import client as mqtt_client import json from time import sleep import yaml import argparse import sys import os import threading import signal import time logging.basicConfig( format='(%(levelname)s) [%(threadName)s] %(message)s', level=logging.DEBUG ) # Global shutdown flag shutdown_event = threading.Event() class DeviceMonitorThread(threading.Thread): """Thread class for monitoring a single device""" def __init__(self, device_name, device_config, mqtt_config, sleep_interval=2): super().__init__(name=f"Device-{device_name}") self.device_name = device_name self.device_config = device_config self.mqtt_config = mqtt_config.copy() self.sleep_interval = sleep_interval self.daemon = True # Dies when main thread dies # Create unique client ID for this device self.mqtt_config['client_id'] = f"snmp-mqtt-{device_name}-{random.randint(0, 1000)}" # Create device request object self.req = { "device_name": device_name, "ip": device_config["ip"], "snmp_community": device_config["snmp_community"], "oids": device_config["oids"] } def run(self): """Main thread execution""" logging.info(f"Starting monitoring thread for device: {self.device_name} ({self.device_config['ip']})") try: # Setup MQTT connection client = connect_mqtt(self.mqtt_config) client.loop_start() state_topic = f"SNMP/{self.device_name}/state" availability_topic = f"SNMP/{self.device_name}/availability" logging.info(f"[{self.device_name}] MQTT client connected") # Publish Home Assistant autodiscovery configuration (only once on startup) publish_ha_autodiscovery_config(client, self.req) # Mark device as available publish(availability_topic, client, "online", True, 1) logging.info(f"[{self.device_name}] Starting monitoring loop") # Main monitoring loop while not shutdown_event.is_set(): try: # Get SNMP data and publish state state = asyncio.run(get_snmp(self.req)) publish(state_topic, client, state, False, 0) logging.debug(f"[{self.device_name}] Published state to {state_topic}: {json.dumps(state)}") # Update availability (heartbeat) publish(availability_topic, client, "online", False, 1) except Exception as e: logging.error(f"[{self.device_name}] Error in monitoring loop: {e}") # Mark as offline on error publish(availability_topic, client, "offline", False, 1) # Wait for next iteration or shutdown signal shutdown_event.wait(timeout=self.sleep_interval) # Cleanup - mark device as offline publish(availability_topic, client, "offline", True, 1) client.loop_stop() client.disconnect() logging.info(f"[{self.device_name}] Monitoring thread stopped gracefully") except Exception as e: logging.error(f"[{self.device_name}] Fatal error in monitoring thread: {e}") # Try to mark as offline on fatal error try: publish(availability_topic, client, "offline", True, 1) client.disconnect() except: pass logging.info(f"[{self.device_name}] Thread {self.name} finished") def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant') parser.add_argument('--config', '-c', required=True, help='Path to YAML configuration file') return parser.parse_args() def load_config(config_path): """Load and validate YAML configuration file""" if not os.path.exists(config_path): logging.error(f"Configuration file not found: {config_path}") sys.exit(1) try: with open(config_path, 'r') as file: config = yaml.safe_load(file) except yaml.YAMLError as e: logging.error(f"Error parsing YAML configuration: {e}") sys.exit(1) except Exception as e: logging.error(f"Error reading configuration file: {e}") sys.exit(1) # Validate required configuration sections if 'mqtt' not in config: logging.error("Missing 'mqtt' section in configuration") sys.exit(1) if 'devices' not in config: logging.error("Missing 'devices' section in configuration") sys.exit(1) # Validate MQTT configuration required_mqtt_fields = ['broker', 'port', 'user', 'password'] for field in required_mqtt_fields: if field not in config['mqtt']: logging.error(f"Missing required MQTT field: {field}") sys.exit(1) # Validate device configurations for device_name, device_config in config['devices'].items(): required_device_fields = ['ip', 'snmp_community', 'oids'] for field in required_device_fields: if field not in device_config: logging.error(f"Missing required field '{field}' in device '{device_name}'") sys.exit(1) # Validate OID configurations for oid in device_config['oids']: required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform'] for field in required_oid_fields: if field not in oid: logging.error(f"Missing required OID field '{field}' in device '{device_name}'") sys.exit(1) # Convert type string to actual Python type if oid['type'] == 'int': oid['type'] = int elif oid['type'] == 'bool': oid['type'] = bool elif oid['type'] == 'str': oid['type'] = str else: logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'") sys.exit(1) logging.info(f"Configuration loaded successfully from {config_path}") return config def connect_mqtt(mqtt_config): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code {rc}") client = mqtt_client.Client() client.username_pw_set(mqtt_config["user"], mqtt_config["password"]) client.on_connect = on_connect client.connect(mqtt_config['broker'], mqtt_config['port']) return client def publish(topic, client, data, retain, qos): if isinstance(data, str): msg = data else: msg = json.dumps(data) result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain)) status = result[0] if status == 0: logging.debug(f"Send `{msg}` to topic `{topic}`") else: logging.error(f"Failed to send message to topic {topic}") async def get_snmp(req): data = {} for oid in req["oids"]: with Slim(1) as slim: errorIndication, errorStatus, errorIndex, varBinds = await slim.get( req["snmp_community"], req["ip"], 161, ObjectType(ObjectIdentity(oid["oid"])), ) if errorIndication: logging.error(errorIndication) elif errorStatus: logging.error( "{} at {}".format( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or "?", ) ) else: for varBind in varBinds: logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}") if oid['type'] == bool: if bool(varBind[1]): data.update({oid["name"]: "ON"}) else: data.update({oid["name"]: "OFF"}) else: data.update({oid["name"]: oid["type"](varBind[1])}) logging.debug(f"JSON : {json.dumps(data)}") return data def create_ha_device_info(req): """Create device information for Home Assistant MQTT Discovery""" return { "identifiers": [f"snmp2mqtt_{req['device_name']}_{req['ip']}".replace(".", "_")], "name": req['device_name'], "model": "SNMP Device", "manufacturer": "Network Equipment", "via_device": "snmp2mqtt" } def create_ha_sensor_config(req, oid): """Create Home Assistant MQTT Discovery configuration for a single sensor""" device_info = create_ha_device_info(req) sensor_id = f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_") config = { "name": f"{req['device_name']} {oid['name']}", "unique_id": sensor_id, "state_topic": f"SNMP/{req['device_name']}/state", "value_template": f"{{{{ value_json.{oid['name']} }}}}", "device": device_info, "origin": { "name": "snmp2mqtt", "sw_version": "1.0.0", "support_url": "https://git.antoineve.me/AntoineVe/snmp2mqtt" } } # Add device class if specified if 'HA_device_class' in oid: config['device_class'] = oid['HA_device_class'] # Add unit of measurement if specified if 'HA_unit' in oid: config['unit_of_measurement'] = oid['HA_unit'] # Add icon based on device class icon_mapping = { 'data_size': 'mdi:network', 'connectivity': 'mdi:network-outline', 'power_factor': 'mdi:gauge', 'temperature': 'mdi:thermometer', 'signal_strength': 'mdi:signal' } if 'HA_device_class' in oid and oid['HA_device_class'] in icon_mapping: config['icon'] = icon_mapping[oid['HA_device_class']] # Add availability topic config['availability'] = { "topic": f"SNMP/{req['device_name']}/availability", "payload_available": "online", "payload_not_available": "offline" } return config def get_ha_discovery_topic(req, oid): """Get the correct Home Assistant MQTT Discovery topic for a sensor""" platform = oid['HA_platform'] # 'sensor' or 'binary_sensor' node_id = req['device_name'] object_id = f"{req['device_name']}_{oid['name']}".replace(".", "_") # Format: homeassistant////config return f"homeassistant/{platform}/{node_id}/{object_id}/config" def publish_ha_autodiscovery_config(client, req): """Publish Home Assistant MQTT Discovery configuration for all sensors of a device""" logging.info(f"[{req['device_name']}] Publishing Home Assistant autodiscovery configuration") # Publish availability as online availability_topic = f"SNMP/{req['device_name']}/availability" publish(availability_topic, client, "online", True, 1) # Publish discovery configuration for each OID/sensor for oid in req['oids']: config = create_ha_sensor_config(req, oid) topic = get_ha_discovery_topic(req, oid) # Publish with retain=True so HA discovers it after restarts publish(topic, client, config, True, 1) logging.info(f"[{req['device_name']}] Published discovery config for {oid['name']} to {topic}") # Small delay to avoid overwhelming the broker time.sleep(0.1) def signal_handler(signum, frame): """Handle shutdown signals gracefully""" logging.info(f"Received signal {signum}, initiating graceful shutdown...") shutdown_event.set() def process_devices(config): """Process multiple devices using threading""" mqtt_config = config['mqtt'].copy() sleep_interval = config.get('sleep_interval', 2) # Setup signal handlers for graceful shutdown signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) device_threads = [] try: logging.info(f"Starting monitoring for {len(config['devices'])} device(s)") # Create and start a thread for each device for device_name, device_config in config['devices'].items(): thread = DeviceMonitorThread( device_name=device_name, device_config=device_config, mqtt_config=mqtt_config, sleep_interval=sleep_interval ) device_threads.append(thread) thread.start() logging.info(f"Started thread for device: {device_name}") # Wait for all threads to complete or shutdown signal while any(thread.is_alive() for thread in device_threads) and not shutdown_event.is_set(): time.sleep(0.5) except Exception as e: logging.error(f"Error in process_devices: {e}") shutdown_event.set() # Wait for all threads to finish logging.info("Waiting for all monitoring threads to finish...") for thread in device_threads: if thread.is_alive(): thread.join(timeout=5.0) # Wait max 5 seconds per thread if thread.is_alive(): logging.warning(f"Thread {thread.name} did not stop gracefully") logging.info("All monitoring threads have finished") def main(): """Main entry point""" args = parse_arguments() config = load_config(args.config) logging.info("Starting snmp2mqtt bridge...") logging.info(f"Configured devices: {list(config['devices'].keys())}") try: process_devices(config) except KeyboardInterrupt: logging.info("Shutdown requested by user") except Exception as e: logging.error(f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": main()