Files
snmp2mqtt/snmp2mqtt.py

261 lines
9.2 KiB
Python
Executable File

#!/bin/env python3
import asyncio
from pysnmp.hlapi.asyncio.slim import Slim
from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType
import logging
import random
from paho.mqtt import client as mqtt_client
import json
from time import sleep
import yaml
import argparse
import sys
import os
logging.basicConfig(
format='(%(levelname)s) %(message)s',
level=logging.DEBUG
)
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='SNMP to MQTT bridge for Home Assistant')
parser.add_argument('--config', '-c', required=True,
help='Path to YAML configuration file')
return parser.parse_args()
def load_config(config_path):
"""Load and validate YAML configuration file"""
if not os.path.exists(config_path):
logging.error(f"Configuration file not found: {config_path}")
sys.exit(1)
try:
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
except yaml.YAMLError as e:
logging.error(f"Error parsing YAML configuration: {e}")
sys.exit(1)
except Exception as e:
logging.error(f"Error reading configuration file: {e}")
sys.exit(1)
# Validate required configuration sections
if 'mqtt' not in config:
logging.error("Missing 'mqtt' section in configuration")
sys.exit(1)
if 'devices' not in config:
logging.error("Missing 'devices' section in configuration")
sys.exit(1)
# Validate MQTT configuration
required_mqtt_fields = ['broker', 'port', 'user', 'password']
for field in required_mqtt_fields:
if field not in config['mqtt']:
logging.error(f"Missing required MQTT field: {field}")
sys.exit(1)
# Validate device configurations
for device_name, device_config in config['devices'].items():
required_device_fields = ['ip', 'snmp_community', 'oids']
for field in required_device_fields:
if field not in device_config:
logging.error(f"Missing required field '{field}' in device '{device_name}'")
sys.exit(1)
# Validate OID configurations
for oid in device_config['oids']:
required_oid_fields = ['name', 'oid', 'type', 'HA_device_class', 'HA_platform']
for field in required_oid_fields:
if field not in oid:
logging.error(f"Missing required OID field '{field}' in device '{device_name}'")
sys.exit(1)
# Convert type string to actual Python type
if oid['type'] == 'int':
oid['type'] = int
elif oid['type'] == 'bool':
oid['type'] = bool
elif oid['type'] == 'str':
oid['type'] = str
else:
logging.error(f"Unsupported type '{oid['type']}' for OID '{oid['name']}' in device '{device_name}'")
sys.exit(1)
logging.info(f"Configuration loaded successfully from {config_path}")
return config
def connect_mqtt(mqtt_config):
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code {rc}")
client = mqtt_client.Client()
client.username_pw_set(mqtt_config["user"], mqtt_config["password"])
client.on_connect = on_connect
client.connect(mqtt_config['broker'], mqtt_config['port'])
return client
def publish(topic, client, data, retain, qos):
msg = json.dumps(data)
result = client.publish(topic=topic, payload=msg, qos=qos, retain=bool(retain))
status = result[0]
if status == 0:
logging.debug(f"Send `{msg}` to topic `{topic}`")
else:
logging.error(f"Failed to send message to topic {topic}")
async def get_snmp(req):
data = {}
for oid in req["oids"]:
with Slim(1) as slim:
errorIndication, errorStatus, errorIndex, varBinds = await slim.get(
req["snmp_community"],
req["ip"],
161,
ObjectType(ObjectIdentity(oid["oid"])),
)
if errorIndication:
logging.error(errorIndication)
elif errorStatus:
logging.error(
"{} at {}".format(
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or "?",
)
)
else:
for varBind in varBinds:
logging.debug(f"{req['device_name']} {oid['name']} => {oid['type'](varBind[1])}")
if oid['type'] == bool:
if bool(varBind[1]):
data.update({oid["name"]: "ON"})
else:
data.update({oid["name"]: "OFF"})
else:
data.update({oid["name"]: oid["type"](varBind[1])})
logging.debug(f"JSON : {json.dumps(data)}")
return data
def ha_create_config(req):
ha_config = {}
device = {
"ids": f"{req['device_name']}_{req['ip']}".replace(".", "_"),
"name": req['device_name'],
}
origin = {
"name": "snmp2mqtt"
}
ha_config.update({"dev": device, "o": origin})
ha_config.update({"state_topic": f"SNMP/{req['device_name']}/state"})
ha_config.update({"qos": 2})
cmps = {}
for oid in req['oids']:
cmps.update(
{
f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"):
{
"p": oid['HA_platform'],
"device_class": oid['HA_device_class'],
"value_template": f"{{{{ value_json.{oid['name']}}}}}",
"unique_id": f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"),
"name": oid['name']
}
})
if "HA_unit" in oid.keys():
cmps.update(
{f"{req['device_name']}_{req['ip']}_{oid['name']}".replace(".", "_"):
{"unit_of_measurement": oid['HA_unit']}})
ha_config.update({"cmps": cmps})
logging.debug(f"config : {json.dumps(ha_config)}")
return ha_config
def send_to_mqtt(device_name, device_config, mqtt_config, sleep_interval=2):
"""Send SNMP data to MQTT for a single device"""
# Create device request object
req = {
"device_name": device_name,
"ip": device_config["ip"],
"snmp_community": device_config["snmp_community"],
"oids": device_config["oids"]
}
config = ha_create_config(req)
client = connect_mqtt(mqtt_config)
client.loop_start()
config_topic = f"homeassistant/device/{config['dev']['ids']}/config"
state_topic = config['state_topic']
while True:
try:
publish(config_topic, client, config, True, 0)
logging.info(f"{config_topic} -> {config}")
except Exception as e:
logging.error(f"Error publishing config for {device_name}: {e}")
pass
try:
state = asyncio.run(get_snmp(req))
publish(state_topic, client, state, False, 0)
logging.info(f"{state_topic} -> {state}")
except Exception as e:
logging.error(f"Error getting SNMP data for {device_name}: {e}")
pass
sleep(sleep_interval)
def process_devices(config):
"""Process multiple devices from configuration"""
mqtt_config = config['mqtt'].copy()
mqtt_config['client_id'] = f"snmp-mqtt-{random.randint(0, 1000)}"
# Get sleep interval from config or use default
sleep_interval = config.get('sleep_interval', 2)
if len(config['devices']) == 1:
# Single device mode - run directly
device_name = list(config['devices'].keys())[0]
device_config = config['devices'][device_name]
logging.info(f"Starting monitoring for single device: {device_name}")
send_to_mqtt(device_name, device_config, mqtt_config, sleep_interval)
else:
# Multiple devices mode - would need threading/multiprocessing
# For now, let's process the first device and warn about others
logging.warning(f"Multiple devices detected ({len(config['devices'])}), but only processing the first one")
logging.warning("Multi-device support will require threading implementation")
device_name = list(config['devices'].keys())[0]
device_config = config['devices'][device_name]
logging.info(f"Starting monitoring for device: {device_name}")
send_to_mqtt(device_name, device_config, mqtt_config, sleep_interval)
def main():
"""Main entry point"""
args = parse_arguments()
config = load_config(args.config)
logging.info("Starting snmp2mqtt bridge...")
logging.info(f"Configured devices: {list(config['devices'].keys())}")
try:
process_devices(config)
except KeyboardInterrupt:
logging.info("Shutdown requested by user")
except Exception as e:
logging.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()