Files
snmp2mqtt/snmp-discover.py

558 lines
19 KiB
Python

#!/usr/bin/env python3
"""
snmp-discover.py - Assistant de découverte SNMP semi-guidé pour snmp2mqtt.
Détecte automatiquement les OIDs disponibles sur un appareil MikroTik ou TP-Link Omada
et met à jour le fichier config.yaml.
"""
import asyncio
import yaml
import shutil
import os
import argparse
import sys
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from pysnmp.hlapi.asyncio import (
SnmpEngine,
CommunityData,
UdpTransportTarget,
ContextData,
ObjectIdentity,
ObjectType,
get_cmd,
walk_cmd,
)
# Catalogue des OIDs par type d'appareil
DEVICE_CATALOGS = {
"mikrotik": {
"manufacturer": "MikroTik",
"system_metrics": {
"cpu_usage": {
"oid": ".1.3.6.1.4.1.14988.1.1.3.14.0",
"name_template": "cpu_usage",
"type": "int",
"HA_device_class": "power_factor",
"HA_platform": "sensor",
"HA_unit": "%",
},
"temperature": {
"oid": ".1.3.6.1.4.1.14988.1.1.6.1.0",
"name_template": "temperature",
"type": "int",
"operation": "value / 1000",
"HA_device_class": "temperature",
"HA_platform": "sensor",
"HA_unit": "°C",
},
"memory_used": {
"oid": ".1.3.6.1.4.1.14988.1.1.4.2.0",
"name_template": "memory_used",
"type": "int",
"HA_device_class": "data_size",
"HA_platform": "sensor",
"HA_unit": "MB",
# Note: MikroTik memory OIDs can vary by version, assuming bytes here requires operation or custom handling.
# Standard HOST-RESOURCES-MIB is often used: hrMemorySize (.1.3.6.1.2.1.25.2.2.0)
# But MikroTik has proprietary ones. Let's adjust operation if needed.
# For now, we'll stick to a simple GET, user may need to tweak operation if values are weird.
"operation": "value / 1000000",
},
"memory_free": {
"oid": ".1.3.6.1.4.1.14988.1.1.4.3.0",
"name_template": "memory_free",
"type": "int",
"HA_device_class": "data_size",
"HA_platform": "sensor",
"HA_unit": "MB",
"operation": "value / 1000000",
},
},
"interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2",
},
"omada": {
"manufacturer": "TP-Link",
"system_metrics": {
"connected_clients": {
"oid": ".1.3.6.1.4.1.11863.10.1.3.0",
"name_template": "connected_clients",
"type": "int",
"HA_device_class": None,
"HA_platform": "sensor",
"HA_unit": "clients",
},
},
"interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2",
},
}
def parse_arguments():
parser = argparse.ArgumentParser(description='SNMP Discovery Tool for snmp2mqtt')
parser.add_argument('--config', '-c', default='config.yaml',
help='Path to YAML configuration file (default: config.yaml)')
return parser.parse_args()
async def snmp_get(ip, community, oid_str):
"""Récupère la valeur d'un OID spécifique via SNMP."""
snmpEngine = SnmpEngine()
try:
errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
snmpEngine,
CommunityData(community),
await UdpTransportTarget.create((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid_str))
)
snmpEngine.close_dispatcher()
if errorIndication:
return None, str(errorIndication)
elif errorStatus:
return None, f"{errorStatus.prettyPrint()}"
else:
for varBind in varBinds:
return str(varBind[1]), None
except Exception as e:
return None, str(e)
finally:
snmpEngine.close_dispatcher()
async def snmp_walk(ip, community, oid_prefix):
"""Effectue un SNMP WALK sur une branche OID."""
results = []
snmpEngine = SnmpEngine()
try:
async for errorIndication, errorStatus, errorIndex, varBinds in walk_cmd(
snmpEngine,
CommunityData(community),
await UdpTransportTarget.create((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid_prefix)),
lexicographicMode=True
):
if errorIndication or errorStatus:
break
for varBind in varBinds:
oid_str = str(varBind[0])
# Le test startswith doit gérer les formats avec/sans le point initial
# oid_prefix = ".1.3.6.1.2.1.2.2.1.2", oid_str = "1.3.6.1.2.1.2.2.1.2.1"
oid_str_normalized = oid_str.lstrip(".")
oid_prefix_normalized = oid_prefix.lstrip(".")
if oid_str_normalized.startswith(oid_prefix_normalized):
results.append((oid_str, str(varBind[1])))
except Exception as e:
print(f"Warning: SNMP walk error on {oid_prefix}: {e}")
finally:
snmpEngine.close_dispatcher()
return results
async def detect_device_type(ip, community):
"""Détecte le type d'équipement via sysDescr."""
# .1.3.6.1.2.1.1.1.0 = sysDescr
value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.1.0")
if err:
return None, None, err
desc_lower = value.lower()
device_type = "unknown"
if "mikrotik" in desc_lower or "routeros" in desc_lower:
device_type = "mikrotik"
elif "eap" in desc_lower or "tp-link" in desc_lower or "omada" in desc_lower:
device_type = "omada"
return device_type, value, None
async def get_device_name(ip, community, fallback_type):
"""Récupère sysName pour nommer l'appareil, sinon utilise un fallback."""
# .1.3.6.1.2.1.1.5.0 = sysName
value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.5.0")
if value and not err:
return value
# Fallback
prefix = fallback_type if fallback_type != "unknown" else "device"
sanitized_ip = ip.replace(".", "_")
# Check if name exists in config to avoid duplicates (simple check)
return f"{prefix}_{sanitized_ip}"
async def discover_interfaces(ip, community) -> List[Dict[str, Any]]:
"""Découvre les interfaces physiques via ifDescr."""
# ifDescr OID base
ifdescr_oid = ".1.3.6.1.2.1.2.2.1.2"
print(f" [+] Scanning interfaces...")
raw_ifs = await snmp_walk(ip, community, ifdescr_oid)
print(f" [DEBUG] Found {len(raw_ifs)} interfaces via walk")
interfaces = []
for oid_full, name in raw_ifs:
# Extraction de l'index (dernière partie de l'OID)
index = oid_full.split(".")[-1]
# Filtrage basique pour ne garder que les interfaces physiques
name_lower = name.lower()
if any(skip in name_lower for skip in ["loopback", "null", "dummy"]):
continue
# Détection des trucs un peu weird (parfois 'vlan', 'bridge' sont utiles, parfois non)
# On garde 'ether', 'sfp', 'wlan', 'eth', 'port', 'bridge', 'lan', 'wan'
if not any(k in name_lower for k in ["ether", "sfp", "wlan", "eth", "port", "bridge", "lan", "wan"]):
# Si on est sur un Omada ou switch, on pourrait avoir des noms génériques
# On reste permissif mais on exclut les trucs vides
if name.strip() == "":
continue
interface = {
"index": index,
"name": name,
"oids": []
}
# Construction des OID pour in, out, status
# in: .10.index, out: .16.index, status: .8.index
prefix = ".1.3.6.1.2.1.2.2.1"
interface["oids"].append({
"name": f"{name}_in",
"oid": f"{prefix}.10.{index}",
"type": "int",
"HA_device_class": "data_size",
"HA_platform": "sensor",
"HA_unit": "bit",
})
interface["oids"].append({
"name": f"{name}_out",
"oid": f"{prefix}.16.{index}",
"type": "int",
"HA_device_class": "data_size",
"HA_platform": "sensor",
"HA_unit": "bit",
})
interface["oids"].append({
"name": f"{name}_status",
"oid": f"{prefix}.8.{index}",
"type": "int", # On peut mettre bool ensuite si on veut map 1->ON
"HA_device_class": "connectivity",
"HA_platform": "binary_sensor",
})
interfaces.append(interface)
print(f" [DEBUG] Returning {len(interfaces)} interfaces")
return interfaces
async def discover_metrics(ip, community, device_type) -> List[Dict[str, Any]]:
"""Teste la disponibilité des OIDs systèmes spécifiques au fabricant."""
catalog = DEVICE_CATALOGS.get(device_type, {}).get("system_metrics", {})
available_metrics = []
if not catalog:
return available_metrics
print(f" [+] Scanning system metrics ({device_type})...")
for key, meta in catalog.items():
oid = meta.get("oid")
if oid:
val, err = await snmp_get(ip, community, oid)
if val and not err:
available_metrics.append({**meta, "key": key})
else:
print(f" - {key}: not available / error")
return available_metrics
def display_catalog(device_name, interfaces, system_metrics, device_type):
"""Affiche le menu interactif de sélection."""
print(f"\n" + "="*60)
print(f" Appareil détecté: {device_type.upper()}"
f"\n Nom: {device_name}")
print("="*60)
all_items = []
# 1. Interfaces
if interfaces:
print(f"\nInterfaces détectées ({len(interfaces)}):")
for idx, iface in enumerate(interfaces):
num = len(all_items) + 1
label = f" [{num}] {iface['name']} (In/Out/Status)"
print(label)
all_items.append({"type": "interface", "data": iface, "label": label})
# 2. Métriques système
if system_metrics:
prefix_map = {
"cpu": "CPU",
"temp": "Température",
"mem": "RAM"
}
print(f"\nMétriques Système:")
for idx, met in enumerate(system_metrics):
num = len(all_items) + 1
# Essayez de deviner un label propre
key_label = met['key'].replace('_', ' ').title()
unit = met.get('HA_unit', '')
op_label = ""
if met.get('operation'):
op_label = " (transformed)"
label = f" [{num}] {key_label} ({unit}){op_label}"
print(label)
all_items.append({"type": "metric", "data": met, "label": label})
if not all_items:
print("\nAucun OID intéressant trouvé.")
return [], []
print("\n" + "-"*40)
print("\nSélectionnez les métriques à surveiller :")
return all_items
def parse_selection(input_str: str, max_num: int) -> List[int]:
"""Parse la sélection utilisateur : '1,3,5-8', 'all', etc."""
indices = set()
if input_str.lower() == "all":
return list(range(1, max_num + 1))
elif input_str.lower() == "none":
return []
parts = input_str.split(",")
for part in parts:
part = part.strip()
if "-" in part:
try:
start, end = part.split("-", 1)
s = int(start)
e = int(end)
for n in range(s, e + 1):
if 1 <= n <= max_num:
indices.add(n)
except ValueError:
continue
else:
try:
n = int(part)
if 1 <= n <= max_num:
indices.add(n)
except ValueError:
continue
return sorted(list(indices))
def backup_config(config_path):
"""Crée une copie de backup du fichier config."""
# On suppose que le path est absolu ou relatif au cwd, on travaille avec le path tel quel
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
bak_path = f"{config_path}.bak.{timestamp}"
try:
# Créer un backup même si le fichier n'existe pas n'a pas de sens,
# mais s'il existe on le copie.
if os.path.exists(config_path):
shutil.copy2(config_path, bak_path)
print(f"[Backup] Sauvegarde créée: {bak_path}")
return True
else:
print(f"[Backup] Pas de fichier existant à sauvegarder, création du fichier.")
return True
except Exception as e:
print(f"[Erreur] Impossible de créer le backup: {e}")
return False
def build_device_oids(interface_data, metrics_data):
"""Construit la liste des objets OID pour le config."""
oids = []
# Ajout des interfaces (groupées)
for if_item in interface_data:
for oid in if_item["data"]["oids"]:
oids.append(oid)
# Ajout des métriques
for met_item in metrics_data:
oid_meta = met_item["data"]
clean_meta = {
"name": oid_meta.get("name_template", "metric"),
"oid": oid_meta["oid"],
"type": oid_meta["type"],
"HA_platform": oid_meta.get("HA_platform"),
}
if oid_meta.get("HA_device_class"):
clean_meta["HA_device_class"] = oid_meta["HA_device_class"]
if oid_meta.get("HA_unit"):
clean_meta["HA_unit"] = oid_meta["HA_unit"]
if "operation" in oid_meta:
clean_meta["operation"] = oid_meta["operation"]
oids.append(clean_meta)
return oids
def add_device_to_config(config_path, device_name, ip, community, device_oids):
"""Ajoute le device au fichier YAML."""
# Structure du nouveau device
new_device = {
"ip": ip,
"snmp_community": community,
"oids": device_oids
}
config = {}
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
content = f.read()
if content.strip():
config = yaml.safe_load(content) or {}
except yaml.YAMLError:
# Si le fichier est corrompu, on ne veut pas perdre le backup fait avant
print(f"[Erreur] Le fichier {config_path} semble invalide. Veuillez vérifier le backup.")
return False
# Assurer la structure de base
if 'mqtt' not in config:
config['mqtt'] = {
'broker': 'IP or FQDN',
'port': 1883,
'user': 'admin',
'password': 'password'
}
if 'devices' not in config:
config['devices'] = {}
# Insertion
config['devices'][device_name] = new_device
# Sauvegarde
try:
with open(config_path, 'w') as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
print(f"[-] Device '{device_name}' ajouté avec succès.")
return True
except Exception as e:
print(f"[Erreur] Echec de l'écriture du fichier config: {e}")
return False
async def process_device_loop(config_path):
"""Gère le loop d'ajout de devices."""
while True:
print("\n--- Découverte d'un nouvel appareil ---")
ip = input("Adresse IP de l'appareil : ").strip()
if not ip:
continue
community = input("Communauté SNMP (défaut: public) : ").strip() or "public"
# 1. Détection
print(f"[*] Connexion à {ip}...")
device_type, sysdescr, err = await detect_device_type(ip, community)
if err:
print(f"[!] Erreur SNMP: {err}")
retry = input("Réessayer ? (o/n) : ").lower()
if retry == 'y' or retry == 'o':
continue
else:
break
if device_type == "unknown":
print(f"[!] Type d'appareil inconnu. Description: {sysdescr}")
force_type = input("Forcer le type ? (mikrotik/omada/none) : ").lower()
if force_type in ["mikrotik", "omada"]:
device_type = force_type
else:
print("Annulé.")
break
print(f"[+] Appareil détecté: {device_type}")
# 2. Nom
device_name = await get_device_name(ip, community, device_type)
# Nettoyage du nom pour qu'il soit un key YAML safe
device_name = device_name.replace(" ", "_").lower()
# 3. Découverte Interfaces
interfaces = await discover_interfaces(ip, community)
# 4. Découverte Métriques
system_metrics = await discover_metrics(ip, community, device_type)
# 5. Display & Sélection
catalog_items = display_catalog(device_name, interfaces, system_metrics, device_type)
if not catalog_items:
choice = input("Continuer avec un autre appareil ? (o/n) : ")
if choice.lower() != 'o' and choice.lower() != 'y':
break
continue
max_idx = len(catalog_items)
selection_str = input(f"\nSélection (1-{max_idx}, ranges, 'all', 'none') : ")
selected_indices = parse_selection(selection_str, max_idx)
if not selected_indices:
print("Aucune sélection.")
choice = input("Continuer avec un autre appareil ? (o/n) : ")
if choice.lower() != 'o' and choice.lower() != 'y':
break
continue
# Filtrer les items sélectionnés
selected_interfaces = []
selected_metrics = []
for idx in selected_indices:
# Attention, indices start at 1
item = catalog_items[idx-1]
if item["type"] == "interface":
selected_interfaces.append(item)
else:
selected_metrics.append(item)
# Construction du device yaml
device_oids = build_device_oids(selected_interfaces, selected_metrics)
# 6. Backup & Save
print(f"\n--- Mise à jour de la configuration ---")
if backup_config(config_path):
success = add_device_to_config(config_path, device_name, ip, community, device_oids)
if success:
pass # Already printed success message
else:
print("[!] Veuillez restaurer manuellement depuis le backup si nécessaire.")
else:
print("[!] Annulé car backup impossible.")
# 7. Continuer ?
cont = input("\nAjouter un autre appareil ? (o/n) : ")
if cont.lower() != 'o' and cont.lower() != 'y':
break
def main():
args = parse_arguments()
asyncio.run(process_device_loop(args.config))
if __name__ == "__main__":
main()