324 lines
12 KiB
Python
Executable File
324 lines
12 KiB
Python
Executable File
#!/bin/env python3
|
|
import asyncio
|
|
from pysnmp.hlapi.asyncio.slim import Slim
|
|
from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType
|
|
import logging
|
|
import random
|
|
from paho.mqtt import client as mqtt_client
|
|
import json
|
|
from time import sleep
|
|
import yaml
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import threading
|
|
import signal
|
|
import time
|
|
|
|
logging.basicConfig(
|
|
format='(%(levelname)s) [%(threadName)s] %(message)s',
|
|
level=logging.DEBUG
|
|
)
|
|
|
|
# Global shutdown flag
|
|
shutdown_event = threading.Event()
|
|
|
|
|
|
class DeviceMonitorThread(threading.Thread):
|
|
"""Thread class for monitoring a single device"""
|
|
|
|
def __init__(self, device_name, device_config, mqtt_config, sleep_interval=2):
|
|
super().__init__(name=f"Device-{device_name}")
|
|
self.device_name = device_name
|
|
self.device_config = device_config
|
|
self.mqtt_config = mqtt_config.copy()
|
|
self.sleep_interval = sleep_interval
|
|
self.daemon = True # Dies when main thread dies
|
|
|
|
# Create unique client ID for this device
|
|
self.mqtt_config['client_id'] = f"snmp-mqtt-{device_name}-{random.randint(0, 1000)}"
|
|
|
|
# Create device request object
|
|
self.req = {
|
|
"device_name": device_name,
|
|
"ip": device_config["ip"],
|
|
"snmp_community": device_config["snmp_community"],
|
|
"oids": device_config["oids"]
|
|
}
|
|
|
|
def run(self):
|
|
"""Main thread execution"""
|
|
logging.info(f"Starting monitoring thread for device: {self.device_name} ({self.device_config['ip']})")
|
|
|
|
try:
|
|
# Setup MQTT connection and Home Assistant config
|
|
ha_config = ha_create_config(self.req)
|
|
client = connect_mqtt(self.mqtt_config)
|
|
client.loop_start()
|
|
|
|
config_topic = f"homeassistant/device/{ha_config['dev']['ids']}/config"
|
|
state_topic = ha_config['state_topic']
|
|
|
|
logging.info(f"[{self.device_name}] MQTT client connected, starting monitoring loop")
|
|
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
# Publish Home Assistant configuration
|
|
publish(config_topic, client, ha_config, True, 0)
|
|
logging.debug(f"[{self.device_name}] Published config to {config_topic}")
|
|
|
|
# Get SNMP data and publish state
|
|
state = asyncio.run(get_snmp(self.req))
|
|
publish(state_topic, client, state, False, 0)
|
|
logging.debug(f"[{self.device_name}] Published state to {state_topic}: {json.dumps(state)}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"[{self.device_name}] Error in monitoring loop: {e}")
|
|
|
|
# Wait for next iteration or shutdown signal
|
|
shutdown_event.wait(timeout=self.sleep_interval)
|
|
|
|
# Cleanup
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
logging.info(f"[{self.device_name}] Monitoring thread stopped gracefully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"[{self.device_name}] Fatal error in monitoring thread: {e}")
|
|
|
|
logging.info(f"[{self.device_name}] Thread {self.name} finished")
|
|
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant')
|
|
parser.add_argument('--config', '-c', required=True,
|
|
help='Path to YAML configuration file')
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_config(config_path):
|
|
"""Load and validate YAML configuration file"""
|
|
if not os.path.exists(config_path):
|
|
logging.error(f"Configuration file not found: {config_path}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
with open(config_path, 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
except yaml.YAMLError as e:
|
|
logging.error(f"Error parsing YAML configuration: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logging.error(f"Error reading configuration file: {e}")
|
|
sys.exit(1)
|
|
|
|
# Validate required configuration sections
|
|
if 'mqtt' not in config:
|
|
logging.error("Missing 'mqtt' section in configuration")
|
|
sys.exit(1)
|
|
|
|
if 'devices' not in config:
|
|
logging.error("Missing 'devices' section in configuration")
|
|
sys.exit(1)
|
|
|
|
# Validate MQTT configuration
|
|
required_mqtt_fields = ['broker', 'port', 'user', 'password']
|
|
for field in required_mqtt_fields:
|
|
if field not in config['mqtt']:
|
|
logging.error(f"Missing required MQTT field: {field}")
|
|
sys.exit(1)
|
|
|
|
# Validate device configurations
|
|
for device_name, device_config in config['devices'].items():
|
|
required_device_fields = ['ip', 'snmp_community', 'oids']
|
|
for field in required_device_fields:
|
|
if field not in device_config:
|
|
logging.error(f"Missing required field '{field}' in device '{device_name}'")
|
|
sys.exit(1)
|
|
|
|
# Validate OID configurations
|
|
for oid in device_config['oids']:
|
|
required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform']
|
|
for field in required_oid_fields:
|
|
if field not in oid:
|
|
logging.error(f"Missing required OID field '{field}' in device '{device_name}'")
|
|
sys.exit(1)
|
|
|
|
# Convert type string to actual Python type
|
|
if oid['type'] == 'int':
|
|
oid['type'] = int
|
|
elif oid['type'] == 'bool':
|
|
oid['type'] = bool
|
|
elif oid['type'] == 'str':
|
|
oid['type'] = str
|
|
else:
|
|
logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'")
|
|
sys.exit(1)
|
|
|
|
logging.info(f"Configuration loaded successfully from {config_path}")
|
|
return config
|
|
|
|
|
|
def connect_mqtt(mqtt_config):
|
|
def on_connect(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
print("Connected to MQTT Broker!")
|
|
else:
|
|
print("Failed to connect, return code {rc}")
|
|
client = mqtt_client.Client()
|
|
client.username_pw_set(mqtt_config["user"], mqtt_config["password"])
|
|
client.on_connect = on_connect
|
|
client.connect(mqtt_config['broker'], mqtt_config['port'])
|
|
return client
|
|
|
|
|
|
def publish(topic, client, data, retain, qos):
|
|
msg = json.dumps(data)
|
|
result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain))
|
|
status = result[0]
|
|
if status == 0:
|
|
logging.debug(f"Send `{msg}` to topic `{topic}`")
|
|
else:
|
|
logging.error(f"Failed to send message to topic {topic}")
|
|
|
|
|
|
async def get_snmp(req):
|
|
data = {}
|
|
for oid in req["oids"]:
|
|
with Slim(1) as slim:
|
|
errorIndication, errorStatus, errorIndex, varBinds = await slim.get(
|
|
req["snmp_community"],
|
|
req["ip"],
|
|
161,
|
|
ObjectType(ObjectIdentity(oid["oid"])),
|
|
)
|
|
|
|
if errorIndication:
|
|
logging.error(errorIndication)
|
|
elif errorStatus:
|
|
logging.error(
|
|
"{} at {}".format(
|
|
errorStatus.prettyPrint(),
|
|
errorIndex and varBinds[int(errorIndex) - 1][0] or "?",
|
|
)
|
|
)
|
|
else:
|
|
for varBind in varBinds:
|
|
logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}")
|
|
if oid['type'] == bool:
|
|
if bool(varBind[1]):
|
|
data.update({oid["name"]: "ON"})
|
|
else:
|
|
data.update({oid["name"]: "OFF"})
|
|
else:
|
|
data.update({oid["name"]: oid["type"](varBind[1])})
|
|
logging.debug(f"JSON : {json.dumps(data)}")
|
|
return data
|
|
|
|
|
|
def ha_create_config(req):
|
|
ha_config = {}
|
|
device = {
|
|
"ids": f"{req['device_name']}_{req['ip']}".replace(".", "_"),
|
|
"name": req['device_name'],
|
|
}
|
|
origin = {
|
|
"name": "snmp2mqtt"
|
|
}
|
|
ha_config.update({"dev": device, "o": origin})
|
|
ha_config.update({"state_topic": f"SNMP/{req['device_name']}/state"})
|
|
ha_config.update({"qos": 2})
|
|
cmps = {}
|
|
for oid in req['oids']:
|
|
cmps.update(
|
|
{
|
|
f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"):
|
|
{
|
|
"p": oid['HA_platform'],
|
|
"device_class": oid['HA_device_class'],
|
|
"value_template": f"{{{{ value_json.{oid['name']}}}}}",
|
|
"unique_id": f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"),
|
|
"name": oid['name']
|
|
}
|
|
})
|
|
if "HA_unit" in oid.keys():
|
|
cmps.update(
|
|
{f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"):
|
|
{"unit_of_measurement": oid['HA_unit']}})
|
|
ha_config.update({"cmps": cmps})
|
|
logging.debug(f"config : {json.dumps(ha_config)}")
|
|
return ha_config
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle shutdown signals gracefully"""
|
|
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
|
|
shutdown_event.set()
|
|
|
|
|
|
def process_devices(config):
|
|
"""Process multiple devices using threading"""
|
|
mqtt_config = config['mqtt'].copy()
|
|
sleep_interval = config.get('sleep_interval', 2)
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
device_threads = []
|
|
|
|
try:
|
|
logging.info(f"Starting monitoring for {len(config['devices'])} device(s)")
|
|
|
|
# Create and start a thread for each device
|
|
for device_name, device_config in config['devices'].items():
|
|
thread = DeviceMonitorThread(
|
|
device_name=device_name,
|
|
device_config=device_config,
|
|
mqtt_config=mqtt_config,
|
|
sleep_interval=sleep_interval
|
|
)
|
|
device_threads.append(thread)
|
|
thread.start()
|
|
logging.info(f"Started thread for device: {device_name}")
|
|
|
|
# Wait for all threads to complete or shutdown signal
|
|
while any(thread.is_alive() for thread in device_threads) and not shutdown_event.is_set():
|
|
time.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in process_devices: {e}")
|
|
shutdown_event.set()
|
|
|
|
# Wait for all threads to finish
|
|
logging.info("Waiting for all monitoring threads to finish...")
|
|
for thread in device_threads:
|
|
if thread.is_alive():
|
|
thread.join(timeout=5.0) # Wait max 5 seconds per thread
|
|
if thread.is_alive():
|
|
logging.warning(f"Thread {thread.name} did not stop gracefully")
|
|
|
|
logging.info("All monitoring threads have finished")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
args = parse_arguments()
|
|
config = load_config(args.config)
|
|
|
|
logging.info("Starting snmp2mqtt bridge...")
|
|
logging.info(f"Configured devices: {list(config['devices'].keys())}")
|
|
|
|
try:
|
|
process_devices(config)
|
|
except KeyboardInterrupt:
|
|
logging.info("Shutdown requested by user")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|