Files
snmp2mqtt/snmp2mqtt.py

414 lines
15 KiB
Python
Executable File

#!/bin/env python3
import asyncio
from pysnmp.hlapi.asyncio import (
get_cmd, CommunityData, UdpTransportTarget, ContextData,
SnmpEngine, ObjectIdentity, ObjectType
)
import logging
import random
from paho.mqtt import client as mqtt_client
import json
from time import sleep
import yaml
import argparse
import sys
import os
import threading
import signal
import time
logging.basicConfig(
format='(%(levelname)s) [%(threadName)s] %(message)s',
level=logging.DEBUG
)
# Global shutdown flag
shutdown_event = threading.Event()
class DeviceMonitorThread(threading.Thread):
"""Thread class for monitoring a single device"""
def __init__(self, device_name, device_config, mqtt_config, sleep_interval=2):
super().__init__(name=f"Device-{device_name}")
self.device_name = device_name
self.device_config = device_config
self.mqtt_config = mqtt_config.copy()
self.sleep_interval = sleep_interval
self.daemon = True # Dies when main thread dies
# Create unique client ID for this device
self.mqtt_config['client_id'] = f"snmp-mqtt-{device_name}-{random.randint(0, 1000)}"
# Create device request object
self.req = {
"device_name": device_name,
"ip": device_config["ip"],
"snmp_community": device_config["snmp_community"],
"oids": device_config["oids"]
}
def run(self):
"""Main thread execution"""
logging.info(f"Starting monitoring thread for device: {self.device_name} ({self.device_config['ip']})")
try:
# Setup MQTT connection
client = connect_mqtt(self.mqtt_config)
client.loop_start()
state_topic = f"SNMP/{self.device_name}/state"
availability_topic = f"SNMP/{self.device_name}/availability"
logging.info(f"[{self.device_name}] MQTT client connected")
# Publish Home Assistant autodiscovery configuration (only once on startup)
publish_ha_autodiscovery_config(client, self.req)
# Mark device as available
publish(availability_topic, client, "online", True, 1)
logging.info(f"[{self.device_name}] Starting monitoring loop")
# Main monitoring loop
while not shutdown_event.is_set():
try:
# Get SNMP data and publish state
state = asyncio.run(get_snmp(self.req))
publish(state_topic, client, state, False, 0)
logging.debug(f"[{self.device_name}] Published state to {state_topic}: {json.dumps(state)}")
# Update availability (heartbeat)
publish(availability_topic, client, "online", False, 1)
except Exception as e:
logging.error(f"[{self.device_name}] Error in monitoring loop: {e}")
# Mark as offline on error
publish(availability_topic, client, "offline", False, 1)
# Wait for next iteration or shutdown signal
shutdown_event.wait(timeout=self.sleep_interval)
# Cleanup - mark device as offline
publish(availability_topic, client, "offline", True, 1)
client.loop_stop()
client.disconnect()
logging.info(f"[{self.device_name}] Monitoring thread stopped gracefully")
except Exception as e:
logging.error(f"[{self.device_name}] Fatal error in monitoring thread: {e}")
# Try to mark as offline on fatal error
try:
publish(availability_topic, client, "offline", True, 1)
client.disconnect()
except:
pass
logging.info(f"[{self.device_name}] Thread {self.name} finished")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant')
parser.add_argument('--config', '-c', required=True,
help='Path to YAML configuration file')
return parser.parse_args()
def load_config(config_path):
"""Load and validate YAML configuration file"""
if not os.path.exists(config_path):
logging.error(f"Configuration file not found: {config_path}")
sys.exit(1)
try:
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
except yaml.YAMLError as e:
logging.error(f"Error parsing YAML configuration: {e}")
sys.exit(1)
except Exception as e:
logging.error(f"Error reading configuration file: {e}")
sys.exit(1)
# Validate required configuration sections
if 'mqtt' not in config:
logging.error("Missing 'mqtt' section in configuration")
sys.exit(1)
if 'devices' not in config:
logging.error("Missing 'devices' section in configuration")
sys.exit(1)
# Validate MQTT configuration
required_mqtt_fields = ['broker', 'port', 'user', 'password']
for field in required_mqtt_fields:
if field not in config['mqtt']:
logging.error(f"Missing required MQTT field: {field}")
sys.exit(1)
# Validate device configurations
for device_name, device_config in config['devices'].items():
required_device_fields = ['ip', 'snmp_community', 'oids']
for field in required_device_fields:
if field not in device_config:
logging.error(f"Missing required field '{field}' in device '{device_name}'")
sys.exit(1)
# Validate OID configurations
for oid in device_config['oids']:
required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform']
for field in required_oid_fields:
if field not in oid:
logging.error(f"Missing required OID field '{field}' in device '{device_name}'")
sys.exit(1)
# Convert type string to actual Python type
if oid['type'] == 'int':
oid['type'] = int
elif oid['type'] == 'bool':
oid['type'] = bool
elif oid['type'] == 'str':
oid['type'] = str
else:
logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'")
sys.exit(1)
logging.info(f"Configuration loaded successfully from {config_path}")
return config
def connect_mqtt(mqtt_config):
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code {rc}")
client = mqtt_client.Client()
client.username_pw_set(mqtt_config["user"], mqtt_config["password"])
client.on_connect = on_connect
client.connect(mqtt_config['broker'], mqtt_config['port'])
return client
def publish(topic, client, data, retain, qos):
if isinstance(data, str):
msg = data
else:
msg = json.dumps(data)
result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain))
status = result[0]
if status == 0:
logging.debug(f"Send `{msg}` to topic `{topic}`")
else:
logging.error(f"Failed to send message to topic {topic}")
async def get_snmp(req):
"""Asynchronously retrieve SNMP data from device using new pysnmp API"""
data = {}
# Create SNMP engine and transport target
snmpEngine = SnmpEngine()
authData = CommunityData(req["snmp_community"])
transportTarget = UdpTransportTarget((req["ip"], 161))
contextData = ContextData()
for oid in req["oids"]:
try:
# Perform async SNMP GET operation
errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
snmpEngine,
authData,
transportTarget,
contextData,
ObjectType(ObjectIdentity(oid["oid"]))
)
if errorIndication:
logging.error(f"{req['device_name']} SNMP error indication: {errorIndication}")
continue
elif errorStatus:
logging.error(
f"{req['device_name']} SNMP error status: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}"
)
continue
else:
for varBind in varBinds:
logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}")
if oid['type'] == bool:
if bool(varBind[1]):
data.update({oid["name"]: "ON"})
else:
data.update({oid["name"]: "OFF"})
else:
data.update({oid["name"]: oid["type"](varBind[1])})
except Exception as e:
logging.error(f"{req['device_name']} Exception getting OID {oid['oid']}: {e}")
continue
logging.debug(f"{req['device_name']} JSON : {json.dumps(data)}")
return data
def create_ha_device_info(req):
"""Create device information for Home Assistant MQTT Discovery"""
return {
"identifiers": [f"snmp2mqtt_{req['device_name']}_{req['ip']}".replace(".", "_")],
"name": req['device_name'],
"model": "SNMP Device",
"manufacturer": "Network Equipment",
"via_device": "snmp2mqtt"
}
def create_ha_sensor_config(req, oid):
"""Create Home Assistant MQTT Discovery configuration for a single sensor"""
device_info = create_ha_device_info(req)
sensor_id = f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_")
config = {
"name": f"{req['device_name']} {oid['name']}",
"unique_id": sensor_id,
"state_topic": f"SNMP/{req['device_name']}/state",
"value_template": f"{{{{ value_json.{oid['name']} }}}}",
"device": device_info,
"origin": {
"name": "snmp2mqtt",
"sw_version": "1.0.0",
"support_url": "https://git.antoineve.me/AntoineVe/snmp2mqtt"
}
}
# Add device class if specified
if 'HA_device_class' in oid:
config['device_class'] = oid['HA_device_class']
# Add unit of measurement if specified
if 'HA_unit' in oid:
config['unit_of_measurement'] = oid['HA_unit']
# Add icon based on device class
icon_mapping = {
'data_size': 'mdi:network',
'connectivity': 'mdi:network-outline',
'power_factor': 'mdi:gauge',
'temperature': 'mdi:thermometer',
'signal_strength': 'mdi:signal'
}
if 'HA_device_class' in oid and oid['HA_device_class'] in icon_mapping:
config['icon'] = icon_mapping[oid['HA_device_class']]
# Add availability topic
config['availability'] = {
"topic": f"SNMP/{req['device_name']}/availability",
"payload_available": "online",
"payload_not_available": "offline"
}
return config
def get_ha_discovery_topic(req, oid):
"""Get the correct Home Assistant MQTT Discovery topic for a sensor"""
platform = oid['HA_platform'] # 'sensor' or 'binary_sensor'
node_id = req['device_name']
object_id = f"{req['device_name']}_{oid['name']}".replace(".", "_")
# Format: homeassistant/<platform>/<node_id>/<object_id>/config
return f"homeassistant/{platform}/{node_id}/{object_id}/config"
def publish_ha_autodiscovery_config(client, req):
"""Publish Home Assistant MQTT Discovery configuration for all sensors of a device"""
logging.info(f"[{req['device_name']}] Publishing Home Assistant autodiscovery configuration")
# Publish availability as online
availability_topic = f"SNMP/{req['device_name']}/availability"
publish(availability_topic, client, "online", True, 1)
# Publish discovery configuration for each OID/sensor
for oid in req['oids']:
config = create_ha_sensor_config(req, oid)
topic = get_ha_discovery_topic(req, oid)
# Publish with retain=True so HA discovers it after restarts
publish(topic, client, config, True, 1)
logging.info(f"[{req['device_name']}] Published discovery config for {oid['name']} to {topic}")
# Small delay to avoid overwhelming the broker
time.sleep(0.1)
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
shutdown_event.set()
def process_devices(config):
"""Process multiple devices using threading"""
mqtt_config = config['mqtt'].copy()
sleep_interval = config.get('sleep_interval', 2)
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
device_threads = []
try:
logging.info(f"Starting monitoring for {len(config['devices'])} device(s)")
# Create and start a thread for each device
for device_name, device_config in config['devices'].items():
thread = DeviceMonitorThread(
device_name=device_name,
device_config=device_config,
mqtt_config=mqtt_config,
sleep_interval=sleep_interval
)
device_threads.append(thread)
thread.start()
logging.info(f"Started thread for device: {device_name}")
# Wait for all threads to complete or shutdown signal
while any(thread.is_alive() for thread in device_threads) and not shutdown_event.is_set():
time.sleep(0.5)
except Exception as e:
logging.error(f"Error in process_devices: {e}")
shutdown_event.set()
# Wait for all threads to finish
logging.info("Waiting for all monitoring threads to finish...")
for thread in device_threads:
if thread.is_alive():
thread.join(timeout=5.0) # Wait max 5 seconds per thread
if thread.is_alive():
logging.warning(f"Thread {thread.name} did not stop gracefully")
logging.info("All monitoring threads have finished")
def main():
"""Main entry point"""
args = parse_arguments()
config = load_config(args.config)
logging.info("Starting snmp2mqtt bridge...")
logging.info(f"Configured devices: {list(config['devices'].keys())}")
try:
process_devices(config)
except KeyboardInterrupt:
logging.info("Shutdown requested by user")
except Exception as e:
logging.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()