From c350ab220393a406a536b7c28b50143214bd507f Mon Sep 17 00:00:00 2001 From: Antoine Van Elstraete Date: Tue, 9 Jun 2026 16:45:08 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20Ajoute=20snmp-discover.py=20pour=20la?= =?UTF-8?q?=20d=C3=A9couverte=20semi-guid=C3=A9e=20des=20OIDs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- snmp-discover.py | 557 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 557 insertions(+) create mode 100644 snmp-discover.py diff --git a/snmp-discover.py b/snmp-discover.py new file mode 100644 index 0000000..3fa6f2b --- /dev/null +++ b/snmp-discover.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python3 +""" +snmp-discover.py - Assistant de découverte SNMP semi-guidé pour snmp2mqtt. +Détecte automatiquement les OIDs disponibles sur un appareil MikroTik ou TP-Link Omada +et met à jour le fichier config.yaml. +""" + +import asyncio +import yaml +import shutil +import os +import argparse +import sys +from datetime import datetime +from typing import List, Dict, Any, Optional, Tuple + +from pysnmp.hlapi.asyncio import ( + SnmpEngine, + CommunityData, + UdpTransportTarget, + ContextData, + ObjectIdentity, + ObjectType, + get_cmd, + walk_cmd, +) + +# Catalogue des OIDs par type d'appareil +DEVICE_CATALOGS = { + "mikrotik": { + "manufacturer": "MikroTik", + "system_metrics": { + "cpu_usage": { + "oid": ".1.3.6.1.4.1.14988.1.1.3.14.0", + "name_template": "cpu_usage", + "type": "int", + "HA_device_class": "power_factor", + "HA_platform": "sensor", + "HA_unit": "%", + }, + "temperature": { + "oid": ".1.3.6.1.4.1.14988.1.1.6.1.0", + "name_template": "temperature", + "type": "int", + "operation": "value / 1000", + "HA_device_class": "temperature", + "HA_platform": "sensor", + "HA_unit": "°C", + }, + "memory_used": { + "oid": ".1.3.6.1.4.1.14988.1.1.4.2.0", + "name_template": "memory_used", + "type": "int", + "HA_device_class": "data_size", + "HA_platform": "sensor", + "HA_unit": "MB", + # Note: MikroTik memory OIDs can vary by version, assuming bytes here requires operation or custom handling. + # Standard HOST-RESOURCES-MIB is often used: hrMemorySize (.1.3.6.1.2.1.25.2.2.0) + # But MikroTik has proprietary ones. Let's adjust operation if needed. + # For now, we'll stick to a simple GET, user may need to tweak operation if values are weird. + "operation": "value / 1000000", + }, + "memory_free": { + "oid": ".1.3.6.1.4.1.14988.1.1.4.3.0", + "name_template": "memory_free", + "type": "int", + "HA_device_class": "data_size", + "HA_platform": "sensor", + "HA_unit": "MB", + "operation": "value / 1000000", + }, + }, + "interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2", + }, + "omada": { + "manufacturer": "TP-Link", + "system_metrics": { + "connected_clients": { + "oid": ".1.3.6.1.4.1.11863.10.1.3.0", + "name_template": "connected_clients", + "type": "int", + "HA_device_class": None, + "HA_platform": "sensor", + "HA_unit": "clients", + }, + }, + "interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2", + }, +} + + +def parse_arguments(): + parser = argparse.ArgumentParser(description='SNMP Discovery Tool for snmp2mqtt') + parser.add_argument('--config', '-c', default='config.yaml', + help='Path to YAML configuration file (default: config.yaml)') + return parser.parse_args() + + +async def snmp_get(ip, community, oid_str): + """Récupère la valeur d'un OID spécifique via SNMP.""" + snmpEngine = SnmpEngine() + try: + errorIndication, errorStatus, errorIndex, varBinds = await get_cmd( + snmpEngine, + CommunityData(community), + await UdpTransportTarget.create((ip, 161)), + ContextData(), + ObjectType(ObjectIdentity(oid_str)) + ) + + snmpEngine.close_dispatcher() + + if errorIndication: + return None, str(errorIndication) + elif errorStatus: + return None, f"{errorStatus.prettyPrint()}" + else: + for varBind in varBinds: + return str(varBind[1]), None + except Exception as e: + return None, str(e) + finally: + snmpEngine.close_dispatcher() + + +async def snmp_walk(ip, community, oid_prefix): + """Effectue un SNMP WALK sur une branche OID.""" + results = [] + snmpEngine = SnmpEngine() + try: + async for errorIndication, errorStatus, errorIndex, varBinds in walk_cmd( + snmpEngine, + CommunityData(community), + await UdpTransportTarget.create((ip, 161)), + ContextData(), + ObjectType(ObjectIdentity(oid_prefix)), + lexicographicMode=True + ): + if errorIndication or errorStatus: + break + for varBind in varBinds: + oid_str = str(varBind[0]) + # Le test startswith doit gérer les formats avec/sans le point initial + # oid_prefix = ".1.3.6.1.2.1.2.2.1.2", oid_str = "1.3.6.1.2.1.2.2.1.2.1" + oid_str_normalized = oid_str.lstrip(".") + oid_prefix_normalized = oid_prefix.lstrip(".") + if oid_str_normalized.startswith(oid_prefix_normalized): + results.append((oid_str, str(varBind[1]))) + except Exception as e: + print(f"Warning: SNMP walk error on {oid_prefix}: {e}") + finally: + snmpEngine.close_dispatcher() + return results + + +async def detect_device_type(ip, community): + """Détecte le type d'équipement via sysDescr.""" + # .1.3.6.1.2.1.1.1.0 = sysDescr + value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.1.0") + + if err: + return None, None, err + + desc_lower = value.lower() + + device_type = "unknown" + if "mikrotik" in desc_lower or "routeros" in desc_lower: + device_type = "mikrotik" + elif "eap" in desc_lower or "tp-link" in desc_lower or "omada" in desc_lower: + device_type = "omada" + + return device_type, value, None + + +async def get_device_name(ip, community, fallback_type): + """Récupère sysName pour nommer l'appareil, sinon utilise un fallback.""" + # .1.3.6.1.2.1.1.5.0 = sysName + value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.5.0") + + if value and not err: + return value + + # Fallback + prefix = fallback_type if fallback_type != "unknown" else "device" + sanitized_ip = ip.replace(".", "_") + # Check if name exists in config to avoid duplicates (simple check) + return f"{prefix}_{sanitized_ip}" + + +async def discover_interfaces(ip, community) -> List[Dict[str, Any]]: + """Découvre les interfaces physiques via ifDescr.""" + # ifDescr OID base + ifdescr_oid = ".1.3.6.1.2.1.2.2.1.2" + + print(f" [+] Scanning interfaces...") + raw_ifs = await snmp_walk(ip, community, ifdescr_oid) + + print(f" [DEBUG] Found {len(raw_ifs)} interfaces via walk") + + interfaces = [] + for oid_full, name in raw_ifs: + # Extraction de l'index (dernière partie de l'OID) + index = oid_full.split(".")[-1] + + # Filtrage basique pour ne garder que les interfaces physiques + name_lower = name.lower() + if any(skip in name_lower for skip in ["loopback", "null", "dummy"]): + continue + + # Détection des trucs un peu weird (parfois 'vlan', 'bridge' sont utiles, parfois non) + # On garde 'ether', 'sfp', 'wlan', 'eth', 'port', 'bridge', 'lan', 'wan' + if not any(k in name_lower for k in ["ether", "sfp", "wlan", "eth", "port", "bridge", "lan", "wan"]): + # Si on est sur un Omada ou switch, on pourrait avoir des noms génériques + # On reste permissif mais on exclut les trucs vides + if name.strip() == "": + continue + + interface = { + "index": index, + "name": name, + "oids": [] + } + + # Construction des OID pour in, out, status + # in: .10.index, out: .16.index, status: .8.index + prefix = ".1.3.6.1.2.1.2.2.1" + interface["oids"].append({ + "name": f"{name}_in", + "oid": f"{prefix}.10.{index}", + "type": "int", + "HA_device_class": "data_size", + "HA_platform": "sensor", + "HA_unit": "bit", + }) + interface["oids"].append({ + "name": f"{name}_out", + "oid": f"{prefix}.16.{index}", + "type": "int", + "HA_device_class": "data_size", + "HA_platform": "sensor", + "HA_unit": "bit", + }) + interface["oids"].append({ + "name": f"{name}_status", + "oid": f"{prefix}.8.{index}", + "type": "int", # On peut mettre bool ensuite si on veut map 1->ON + "HA_device_class": "connectivity", + "HA_platform": "binary_sensor", + }) + + interfaces.append(interface) + + print(f" [DEBUG] Returning {len(interfaces)} interfaces") + return interfaces + + +async def discover_metrics(ip, community, device_type) -> List[Dict[str, Any]]: + """Teste la disponibilité des OIDs systèmes spécifiques au fabricant.""" + catalog = DEVICE_CATALOGS.get(device_type, {}).get("system_metrics", {}) + available_metrics = [] + + if not catalog: + return available_metrics + + print(f" [+] Scanning system metrics ({device_type})...") + + for key, meta in catalog.items(): + oid = meta.get("oid") + if oid: + val, err = await snmp_get(ip, community, oid) + if val and not err: + available_metrics.append({**meta, "key": key}) + else: + print(f" - {key}: not available / error") + + return available_metrics + + +def display_catalog(device_name, interfaces, system_metrics, device_type): + """Affiche le menu interactif de sélection.""" + print(f"\n" + "="*60) + print(f" Appareil détecté: {device_type.upper()}" + f"\n Nom: {device_name}") + print("="*60) + + all_items = [] + + # 1. Interfaces + if interfaces: + print(f"\nInterfaces détectées ({len(interfaces)}):") + for idx, iface in enumerate(interfaces): + num = len(all_items) + 1 + label = f" [{num}] {iface['name']} (In/Out/Status)" + print(label) + all_items.append({"type": "interface", "data": iface, "label": label}) + + # 2. Métriques système + if system_metrics: + prefix_map = { + "cpu": "CPU", + "temp": "Température", + "mem": "RAM" + } + print(f"\nMétriques Système:") + for idx, met in enumerate(system_metrics): + num = len(all_items) + 1 + # Essayez de deviner un label propre + key_label = met['key'].replace('_', ' ').title() + unit = met.get('HA_unit', '') + op_label = "" + if met.get('operation'): + op_label = " (transformed)" + label = f" [{num}] {key_label} ({unit}){op_label}" + print(label) + all_items.append({"type": "metric", "data": met, "label": label}) + + if not all_items: + print("\nAucun OID intéressant trouvé.") + return [], [] + + print("\n" + "-"*40) + print("\nSélectionnez les métriques à surveiller :") + return all_items + + +def parse_selection(input_str: str, max_num: int) -> List[int]: + """Parse la sélection utilisateur : '1,3,5-8', 'all', etc.""" + indices = set() + + if input_str.lower() == "all": + return list(range(1, max_num + 1)) + elif input_str.lower() == "none": + return [] + + parts = input_str.split(",") + for part in parts: + part = part.strip() + if "-" in part: + try: + start, end = part.split("-", 1) + s = int(start) + e = int(end) + for n in range(s, e + 1): + if 1 <= n <= max_num: + indices.add(n) + except ValueError: + continue + else: + try: + n = int(part) + if 1 <= n <= max_num: + indices.add(n) + except ValueError: + continue + + return sorted(list(indices)) + + +def backup_config(config_path): + """Crée une copie de backup du fichier config.""" + # On suppose que le path est absolu ou relatif au cwd, on travaille avec le path tel quel + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + bak_path = f"{config_path}.bak.{timestamp}" + + try: + # Créer un backup même si le fichier n'existe pas n'a pas de sens, + # mais s'il existe on le copie. + if os.path.exists(config_path): + shutil.copy2(config_path, bak_path) + print(f"[Backup] Sauvegarde créée: {bak_path}") + return True + else: + print(f"[Backup] Pas de fichier existant à sauvegarder, création du fichier.") + return True + except Exception as e: + print(f"[Erreur] Impossible de créer le backup: {e}") + return False + + +def build_device_oids(interface_data, metrics_data): + """Construit la liste des objets OID pour le config.""" + oids = [] + + # Ajout des interfaces (groupées) + for if_item in interface_data: + for oid in if_item["data"]["oids"]: + oids.append(oid) + + # Ajout des métriques + for met_item in metrics_data: + oid_meta = met_item["data"] + clean_meta = { + "name": oid_meta.get("name_template", "metric"), + "oid": oid_meta["oid"], + "type": oid_meta["type"], + "HA_platform": oid_meta.get("HA_platform"), + } + if oid_meta.get("HA_device_class"): + clean_meta["HA_device_class"] = oid_meta["HA_device_class"] + if oid_meta.get("HA_unit"): + clean_meta["HA_unit"] = oid_meta["HA_unit"] + if "operation" in oid_meta: + clean_meta["operation"] = oid_meta["operation"] + oids.append(clean_meta) + + return oids + + +def add_device_to_config(config_path, device_name, ip, community, device_oids): + """Ajoute le device au fichier YAML.""" + # Structure du nouveau device + new_device = { + "ip": ip, + "snmp_community": community, + "oids": device_oids + } + + config = {} + if os.path.exists(config_path): + try: + with open(config_path, 'r') as f: + content = f.read() + if content.strip(): + config = yaml.safe_load(content) or {} + except yaml.YAMLError: + # Si le fichier est corrompu, on ne veut pas perdre le backup fait avant + print(f"[Erreur] Le fichier {config_path} semble invalide. Veuillez vérifier le backup.") + return False + + # Assurer la structure de base + if 'mqtt' not in config: + config['mqtt'] = { + 'broker': 'IP or FQDN', + 'port': 1883, + 'user': 'admin', + 'password': 'password' + } + + if 'devices' not in config: + config['devices'] = {} + + # Insertion + config['devices'][device_name] = new_device + + # Sauvegarde + try: + with open(config_path, 'w') as f: + yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + print(f"[-] Device '{device_name}' ajouté avec succès.") + return True + except Exception as e: + print(f"[Erreur] Echec de l'écriture du fichier config: {e}") + return False + + +async def process_device_loop(config_path): + """Gère le loop d'ajout de devices.""" + while True: + print("\n--- Découverte d'un nouvel appareil ---") + ip = input("Adresse IP de l'appareil : ").strip() + if not ip: + continue + + community = input("Communauté SNMP (défaut: public) : ").strip() or "public" + + # 1. Détection + print(f"[*] Connexion à {ip}...") + device_type, sysdescr, err = await detect_device_type(ip, community) + + if err: + print(f"[!] Erreur SNMP: {err}") + retry = input("Réessayer ? (o/n) : ").lower() + if retry == 'y' or retry == 'o': + continue + else: + break + + if device_type == "unknown": + print(f"[!] Type d'appareil inconnu. Description: {sysdescr}") + force_type = input("Forcer le type ? (mikrotik/omada/none) : ").lower() + if force_type in ["mikrotik", "omada"]: + device_type = force_type + else: + print("Annulé.") + break + + print(f"[+] Appareil détecté: {device_type}") + + # 2. Nom + device_name = await get_device_name(ip, community, device_type) + # Nettoyage du nom pour qu'il soit un key YAML safe + device_name = device_name.replace(" ", "_").lower() + + # 3. Découverte Interfaces + interfaces = await discover_interfaces(ip, community) + + # 4. Découverte Métriques + system_metrics = await discover_metrics(ip, community, device_type) + + # 5. Display & Sélection + catalog_items = display_catalog(device_name, interfaces, system_metrics, device_type) + if not catalog_items: + choice = input("Continuer avec un autre appareil ? (o/n) : ") + if choice.lower() != 'o' and choice.lower() != 'y': + break + continue + + max_idx = len(catalog_items) + selection_str = input(f"\nSélection (1-{max_idx}, ranges, 'all', 'none') : ") + selected_indices = parse_selection(selection_str, max_idx) + + if not selected_indices: + print("Aucune sélection.") + choice = input("Continuer avec un autre appareil ? (o/n) : ") + if choice.lower() != 'o' and choice.lower() != 'y': + break + continue + + # Filtrer les items sélectionnés + selected_interfaces = [] + selected_metrics = [] + + for idx in selected_indices: + # Attention, indices start at 1 + item = catalog_items[idx-1] + if item["type"] == "interface": + selected_interfaces.append(item) + else: + selected_metrics.append(item) + + # Construction du device yaml + device_oids = build_device_oids(selected_interfaces, selected_metrics) + + # 6. Backup & Save + print(f"\n--- Mise à jour de la configuration ---") + if backup_config(config_path): + success = add_device_to_config(config_path, device_name, ip, community, device_oids) + if success: + pass # Already printed success message + else: + print("[!] Veuillez restaurer manuellement depuis le backup si nécessaire.") + else: + print("[!] Annulé car backup impossible.") + + # 7. Continuer ? + cont = input("\nAjouter un autre appareil ? (o/n) : ") + if cont.lower() != 'o' and cont.lower() != 'y': + break + + +def main(): + args = parse_arguments() + asyncio.run(process_device_loop(args.config)) + + +if __name__ == "__main__": + main()