#!/bin/env python3 import asyncio from pysnmp.hlapi.asyncio.slim import Slim from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType import logging import random from paho.mqtt import client as mqtt_client import json from time import sleep import yaml import argparse import sys import os import threading import signal import time logging.basicConfig( format='(%(levelname)s) [%(threadName)s] %(message)s', level=logging.DEBUG ) # Global shutdown flag shutdown_event = threading.Event() class DeviceMonitorThread(threading.Thread): """Thread class for monitoring a single device""" def __init__(self, device_name, device_config, mqtt_config, sleep_interval=2): super().__init__(name=f"Device-{device_name}") self.device_name = device_name self.device_config = device_config self.mqtt_config = mqtt_config.copy() self.sleep_interval = sleep_interval self.daemon = True # Dies when main thread dies # Create unique client ID for this device self.mqtt_config['client_id'] = f"snmp-mqtt-{device_name}-{random.randint(0, 1000)}" # Create device request object self.req = { "device_name": device_name, "ip": device_config["ip"], "snmp_community": device_config["snmp_community"], "oids": device_config["oids"] } def run(self): """Main thread execution""" logging.info(f"Starting monitoring thread for device: {self.device_name} ({self.device_config['ip']})") try: # Setup MQTT connection and Home Assistant config ha_config = ha_create_config(self.req) client = connect_mqtt(self.mqtt_config) client.loop_start() config_topic = f"homeassistant/device/{ha_config['dev']['ids']}/config" state_topic = ha_config['state_topic'] logging.info(f"[{self.device_name}] MQTT client connected, starting monitoring loop") while not shutdown_event.is_set(): try: # Publish Home Assistant configuration publish(config_topic, client, ha_config, True, 0) logging.debug(f"[{self.device_name}] Published config to {config_topic}") # Get SNMP data and publish state state = asyncio.run(get_snmp(self.req)) publish(state_topic, client, state, False, 0) logging.debug(f"[{self.device_name}] Published state to {state_topic}: {json.dumps(state)}") except Exception as e: logging.error(f"[{self.device_name}] Error in monitoring loop: {e}") # Wait for next iteration or shutdown signal shutdown_event.wait(timeout=self.sleep_interval) # Cleanup client.loop_stop() client.disconnect() logging.info(f"[{self.device_name}] Monitoring thread stopped gracefully") except Exception as e: logging.error(f"[{self.device_name}] Fatal error in monitoring thread: {e}") logging.info(f"[{self.device_name}] Thread {self.name} finished") def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant') parser.add_argument('--config', '-c', required=True, help='Path to YAML configuration file') return parser.parse_args() def load_config(config_path): """Load and validate YAML configuration file""" if not os.path.exists(config_path): logging.error(f"Configuration file not found: {config_path}") sys.exit(1) try: with open(config_path, 'r') as file: config = yaml.safe_load(file) except yaml.YAMLError as e: logging.error(f"Error parsing YAML configuration: {e}") sys.exit(1) except Exception as e: logging.error(f"Error reading configuration file: {e}") sys.exit(1) # Validate required configuration sections if 'mqtt' not in config: logging.error("Missing 'mqtt' section in configuration") sys.exit(1) if 'devices' not in config: logging.error("Missing 'devices' section in configuration") sys.exit(1) # Validate MQTT configuration required_mqtt_fields = ['broker', 'port', 'user', 'password'] for field in required_mqtt_fields: if field not in config['mqtt']: logging.error(f"Missing required MQTT field: {field}") sys.exit(1) # Validate device configurations for device_name, device_config in config['devices'].items(): required_device_fields = ['ip', 'snmp_community', 'oids'] for field in required_device_fields: if field not in device_config: logging.error(f"Missing required field '{field}' in device '{device_name}'") sys.exit(1) # Validate OID configurations for oid in device_config['oids']: required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform'] for field in required_oid_fields: if field not in oid: logging.error(f"Missing required OID field '{field}' in device '{device_name}'") sys.exit(1) # Convert type string to actual Python type if oid['type'] == 'int': oid['type'] = int elif oid['type'] == 'bool': oid['type'] = bool elif oid['type'] == 'str': oid['type'] = str else: logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'") sys.exit(1) logging.info(f"Configuration loaded successfully from {config_path}") return config def connect_mqtt(mqtt_config): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code {rc}") client = mqtt_client.Client() client.username_pw_set(mqtt_config["user"], mqtt_config["password"]) client.on_connect = on_connect client.connect(mqtt_config['broker'], mqtt_config['port']) return client def publish(topic, client, data, retain, qos): msg = json.dumps(data) result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain)) status = result[0] if status == 0: logging.debug(f"Send `{msg}` to topic `{topic}`") else: logging.error(f"Failed to send message to topic {topic}") async def get_snmp(req): data = {} for oid in req["oids"]: with Slim(1) as slim: errorIndication, errorStatus, errorIndex, varBinds = await slim.get( req["snmp_community"], req["ip"], 161, ObjectType(ObjectIdentity(oid["oid"])), ) if errorIndication: logging.error(errorIndication) elif errorStatus: logging.error( "{} at {}".format( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or "?", ) ) else: for varBind in varBinds: logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}") if oid['type'] == bool: if bool(varBind[1]): data.update({oid["name"]: "ON"}) else: data.update({oid["name"]: "OFF"}) else: data.update({oid["name"]: oid["type"](varBind[1])}) logging.debug(f"JSON : {json.dumps(data)}") return data def ha_create_config(req): ha_config = {} device = { "ids": f"{req['device_name']}_{req['ip']}".replace(".", "_"), "name": req['device_name'], } origin = { "name": "snmp2mqtt" } ha_config.update({"dev": device, "o": origin}) ha_config.update({"state_topic": f"SNMP/{req['device_name']}/state"}) ha_config.update({"qos": 2}) cmps = {} for oid in req['oids']: cmps.update( { f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"): { "p": oid['HA_platform'], "device_class": oid['HA_device_class'], "value_template": f"{{{{ value_json.{oid['name']}}}}}", "unique_id": f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"), "name": oid['name'] } }) if "HA_unit" in oid.keys(): cmps.update( {f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"): {"unit_of_measurement": oid['HA_unit']}}) ha_config.update({"cmps": cmps}) logging.debug(f"config : {json.dumps(ha_config)}") return ha_config def signal_handler(signum, frame): """Handle shutdown signals gracefully""" logging.info(f"Received signal {signum}, initiating graceful shutdown...") shutdown_event.set() def process_devices(config): """Process multiple devices using threading""" mqtt_config = config['mqtt'].copy() sleep_interval = config.get('sleep_interval', 2) # Setup signal handlers for graceful shutdown signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) device_threads = [] try: logging.info(f"Starting monitoring for {len(config['devices'])} device(s)") # Create and start a thread for each device for device_name, device_config in config['devices'].items(): thread = DeviceMonitorThread( device_name=device_name, device_config=device_config, mqtt_config=mqtt_config, sleep_interval=sleep_interval ) device_threads.append(thread) thread.start() logging.info(f"Started thread for device: {device_name}") # Wait for all threads to complete or shutdown signal while any(thread.is_alive() for thread in device_threads) and not shutdown_event.is_set(): time.sleep(0.5) except Exception as e: logging.error(f"Error in process_devices: {e}") shutdown_event.set() # Wait for all threads to finish logging.info("Waiting for all monitoring threads to finish...") for thread in device_threads: if thread.is_alive(): thread.join(timeout=5.0) # Wait max 5 seconds per thread if thread.is_alive(): logging.warning(f"Thread {thread.name} did not stop gracefully") logging.info("All monitoring threads have finished") def main(): """Main entry point""" args = parse_arguments() config = load_config(args.config) logging.info("Starting snmp2mqtt bridge...") logging.info(f"Configured devices: {list(config['devices'].keys())}") try: process_devices(config) except KeyboardInterrupt: logging.info("Shutdown requested by user") except Exception as e: logging.error(f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": main()