#!/usr/bin/env python3 """ snmp-discover.py - Assistant de découverte SNMP semi-guidé pour snmp2mqtt. Détecte automatiquement les OIDs disponibles sur un appareil MikroTik ou TP-Link Omada et met à jour le fichier config.yaml. """ import asyncio import yaml import shutil import os import argparse import sys from datetime import datetime from typing import List, Dict, Any, Optional, Tuple from pysnmp.hlapi.asyncio import ( SnmpEngine, CommunityData, UdpTransportTarget, ContextData, ObjectIdentity, ObjectType, get_cmd, walk_cmd, ) # Catalogue des OIDs par type d'appareil DEVICE_CATALOGS = { "mikrotik": { "manufacturer": "MikroTik", "system_metrics": { "cpu_usage": { "oid": ".1.3.6.1.4.1.14988.1.1.3.14.0", "name_template": "cpu_usage", "type": "int", "HA_device_class": "power_factor", "HA_platform": "sensor", "HA_unit": "%", }, "temperature": { "oid": ".1.3.6.1.4.1.14988.1.1.6.1.0", "name_template": "temperature", "type": "int", "operation": "value / 1000", "HA_device_class": "temperature", "HA_platform": "sensor", "HA_unit": "°C", }, "memory_used": { "oid": ".1.3.6.1.4.1.14988.1.1.4.2.0", "name_template": "memory_used", "type": "int", "HA_device_class": "data_size", "HA_platform": "sensor", "HA_unit": "MB", # Note: MikroTik memory OIDs can vary by version, assuming bytes here requires operation or custom handling. # Standard HOST-RESOURCES-MIB is often used: hrMemorySize (.1.3.6.1.2.1.25.2.2.0) # But MikroTik has proprietary ones. Let's adjust operation if needed. # For now, we'll stick to a simple GET, user may need to tweak operation if values are weird. "operation": "value / 1000000", }, "memory_free": { "oid": ".1.3.6.1.4.1.14988.1.1.4.3.0", "name_template": "memory_free", "type": "int", "HA_device_class": "data_size", "HA_platform": "sensor", "HA_unit": "MB", "operation": "value / 1000000", }, }, "interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2", }, "omada": { "manufacturer": "TP-Link", "system_metrics": { "connected_clients": { "oid": ".1.3.6.1.4.1.11863.10.1.3.0", "name_template": "connected_clients", "type": "int", "HA_device_class": None, "HA_platform": "sensor", "HA_unit": "clients", }, }, "interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2", }, } def parse_arguments(): parser = argparse.ArgumentParser(description='SNMP Discovery Tool for snmp2mqtt') parser.add_argument('--config', '-c', default='config.yaml', help='Path to YAML configuration file (default: config.yaml)') return parser.parse_args() async def snmp_get(ip, community, oid_str): """Récupère la valeur d'un OID spécifique via SNMP.""" snmpEngine = SnmpEngine() try: errorIndication, errorStatus, errorIndex, varBinds = await get_cmd( snmpEngine, CommunityData(community), await UdpTransportTarget.create((ip, 161)), ContextData(), ObjectType(ObjectIdentity(oid_str)) ) snmpEngine.close_dispatcher() if errorIndication: return None, str(errorIndication) elif errorStatus: return None, f"{errorStatus.prettyPrint()}" else: for varBind in varBinds: return str(varBind[1]), None except Exception as e: return None, str(e) finally: snmpEngine.close_dispatcher() async def snmp_walk(ip, community, oid_prefix): """Effectue un SNMP WALK sur une branche OID.""" results = [] snmpEngine = SnmpEngine() try: async for errorIndication, errorStatus, errorIndex, varBinds in walk_cmd( snmpEngine, CommunityData(community), await UdpTransportTarget.create((ip, 161)), ContextData(), ObjectType(ObjectIdentity(oid_prefix)), lexicographicMode=True ): if errorIndication or errorStatus: break for varBind in varBinds: oid_str = str(varBind[0]) # Le test startswith doit gérer les formats avec/sans le point initial # oid_prefix = ".1.3.6.1.2.1.2.2.1.2", oid_str = "1.3.6.1.2.1.2.2.1.2.1" oid_str_normalized = oid_str.lstrip(".") oid_prefix_normalized = oid_prefix.lstrip(".") if oid_str_normalized.startswith(oid_prefix_normalized): results.append((oid_str, str(varBind[1]))) except Exception as e: print(f"Warning: SNMP walk error on {oid_prefix}: {e}") finally: snmpEngine.close_dispatcher() return results async def detect_device_type(ip, community): """Détecte le type d'équipement via sysDescr.""" # .1.3.6.1.2.1.1.1.0 = sysDescr value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.1.0") if err: return None, None, err desc_lower = value.lower() device_type = "unknown" if "mikrotik" in desc_lower or "routeros" in desc_lower: device_type = "mikrotik" elif "eap" in desc_lower or "tp-link" in desc_lower or "omada" in desc_lower: device_type = "omada" return device_type, value, None async def get_device_name(ip, community, fallback_type): """Récupère sysName pour nommer l'appareil, sinon utilise un fallback.""" # .1.3.6.1.2.1.1.5.0 = sysName value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.5.0") if value and not err: return value # Fallback prefix = fallback_type if fallback_type != "unknown" else "device" sanitized_ip = ip.replace(".", "_") # Check if name exists in config to avoid duplicates (simple check) return f"{prefix}_{sanitized_ip}" async def discover_interfaces(ip, community) -> List[Dict[str, Any]]: """Découvre les interfaces physiques via ifDescr.""" # ifDescr OID base ifdescr_oid = ".1.3.6.1.2.1.2.2.1.2" print(f" [+] Scanning interfaces...") raw_ifs = await snmp_walk(ip, community, ifdescr_oid) print(f" [DEBUG] Found {len(raw_ifs)} interfaces via walk") interfaces = [] for oid_full, name in raw_ifs: # Extraction de l'index (dernière partie de l'OID) index = oid_full.split(".")[-1] # Filtrage basique pour ne garder que les interfaces physiques name_lower = name.lower() if any(skip in name_lower for skip in ["loopback", "null", "dummy"]): continue # Détection des trucs un peu weird (parfois 'vlan', 'bridge' sont utiles, parfois non) # On garde 'ether', 'sfp', 'wlan', 'eth', 'port', 'bridge', 'lan', 'wan' if not any(k in name_lower for k in ["ether", "sfp", "wlan", "eth", "port", "bridge", "lan", "wan"]): # Si on est sur un Omada ou switch, on pourrait avoir des noms génériques # On reste permissif mais on exclut les trucs vides if name.strip() == "": continue interface = { "index": index, "name": name, "oids": [] } # Construction des OID pour in, out, status # in: .10.index, out: .16.index, status: .8.index prefix = ".1.3.6.1.2.1.2.2.1" interface["oids"].append({ "name": f"{name}_in", "oid": f"{prefix}.10.{index}", "type": "int", "HA_device_class": "data_size", "HA_platform": "sensor", "HA_unit": "bit", }) interface["oids"].append({ "name": f"{name}_out", "oid": f"{prefix}.16.{index}", "type": "int", "HA_device_class": "data_size", "HA_platform": "sensor", "HA_unit": "bit", }) interface["oids"].append({ "name": f"{name}_status", "oid": f"{prefix}.8.{index}", "type": "int", # On peut mettre bool ensuite si on veut map 1->ON "HA_device_class": "connectivity", "HA_platform": "binary_sensor", }) interfaces.append(interface) print(f" [DEBUG] Returning {len(interfaces)} interfaces") return interfaces async def discover_metrics(ip, community, device_type) -> List[Dict[str, Any]]: """Teste la disponibilité des OIDs systèmes spécifiques au fabricant.""" catalog = DEVICE_CATALOGS.get(device_type, {}).get("system_metrics", {}) available_metrics = [] if not catalog: return available_metrics print(f" [+] Scanning system metrics ({device_type})...") for key, meta in catalog.items(): oid = meta.get("oid") if oid: val, err = await snmp_get(ip, community, oid) if val and not err: available_metrics.append({**meta, "key": key}) else: print(f" - {key}: not available / error") return available_metrics def display_catalog(device_name, interfaces, system_metrics, device_type): """Affiche le menu interactif de sélection.""" print(f"\n" + "="*60) print(f" Appareil détecté: {device_type.upper()}" f"\n Nom: {device_name}") print("="*60) all_items = [] # 1. Interfaces if interfaces: print(f"\nInterfaces détectées ({len(interfaces)}):") for idx, iface in enumerate(interfaces): num = len(all_items) + 1 label = f" [{num}] {iface['name']} (In/Out/Status)" print(label) all_items.append({"type": "interface", "data": iface, "label": label}) # 2. Métriques système if system_metrics: prefix_map = { "cpu": "CPU", "temp": "Température", "mem": "RAM" } print(f"\nMétriques Système:") for idx, met in enumerate(system_metrics): num = len(all_items) + 1 # Essayez de deviner un label propre key_label = met['key'].replace('_', ' ').title() unit = met.get('HA_unit', '') op_label = "" if met.get('operation'): op_label = " (transformed)" label = f" [{num}] {key_label} ({unit}){op_label}" print(label) all_items.append({"type": "metric", "data": met, "label": label}) if not all_items: print("\nAucun OID intéressant trouvé.") return [], [] print("\n" + "-"*40) print("\nSélectionnez les métriques à surveiller :") return all_items def parse_selection(input_str: str, max_num: int) -> List[int]: """Parse la sélection utilisateur : '1,3,5-8', 'all', etc.""" indices = set() if input_str.lower() == "all": return list(range(1, max_num + 1)) elif input_str.lower() == "none": return [] parts = input_str.split(",") for part in parts: part = part.strip() if "-" in part: try: start, end = part.split("-", 1) s = int(start) e = int(end) for n in range(s, e + 1): if 1 <= n <= max_num: indices.add(n) except ValueError: continue else: try: n = int(part) if 1 <= n <= max_num: indices.add(n) except ValueError: continue return sorted(list(indices)) def backup_config(config_path): """Crée une copie de backup du fichier config.""" # On suppose que le path est absolu ou relatif au cwd, on travaille avec le path tel quel timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") bak_path = f"{config_path}.bak.{timestamp}" try: # Créer un backup même si le fichier n'existe pas n'a pas de sens, # mais s'il existe on le copie. if os.path.exists(config_path): shutil.copy2(config_path, bak_path) print(f"[Backup] Sauvegarde créée: {bak_path}") return True else: print(f"[Backup] Pas de fichier existant à sauvegarder, création du fichier.") return True except Exception as e: print(f"[Erreur] Impossible de créer le backup: {e}") return False def build_device_oids(interface_data, metrics_data): """Construit la liste des objets OID pour le config.""" oids = [] # Ajout des interfaces (groupées) for if_item in interface_data: for oid in if_item["data"]["oids"]: oids.append(oid) # Ajout des métriques for met_item in metrics_data: oid_meta = met_item["data"] clean_meta = { "name": oid_meta.get("name_template", "metric"), "oid": oid_meta["oid"], "type": oid_meta["type"], "HA_platform": oid_meta.get("HA_platform"), } if oid_meta.get("HA_device_class"): clean_meta["HA_device_class"] = oid_meta["HA_device_class"] if oid_meta.get("HA_unit"): clean_meta["HA_unit"] = oid_meta["HA_unit"] if "operation" in oid_meta: clean_meta["operation"] = oid_meta["operation"] oids.append(clean_meta) return oids def add_device_to_config(config_path, device_name, ip, community, device_oids): """Ajoute le device au fichier YAML.""" # Structure du nouveau device new_device = { "ip": ip, "snmp_community": community, "oids": device_oids } config = {} if os.path.exists(config_path): try: with open(config_path, 'r') as f: content = f.read() if content.strip(): config = yaml.safe_load(content) or {} except yaml.YAMLError: # Si le fichier est corrompu, on ne veut pas perdre le backup fait avant print(f"[Erreur] Le fichier {config_path} semble invalide. Veuillez vérifier le backup.") return False # Assurer la structure de base if 'mqtt' not in config: config['mqtt'] = { 'broker': 'IP or FQDN', 'port': 1883, 'user': 'admin', 'password': 'password' } if 'devices' not in config: config['devices'] = {} # Insertion config['devices'][device_name] = new_device # Sauvegarde try: with open(config_path, 'w') as f: yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True) print(f"[-] Device '{device_name}' ajouté avec succès.") return True except Exception as e: print(f"[Erreur] Echec de l'écriture du fichier config: {e}") return False async def process_device_loop(config_path): """Gère le loop d'ajout de devices.""" while True: print("\n--- Découverte d'un nouvel appareil ---") ip = input("Adresse IP de l'appareil : ").strip() if not ip: continue community = input("Communauté SNMP (défaut: public) : ").strip() or "public" # 1. Détection print(f"[*] Connexion à {ip}...") device_type, sysdescr, err = await detect_device_type(ip, community) if err: print(f"[!] Erreur SNMP: {err}") retry = input("Réessayer ? (o/n) : ").lower() if retry == 'y' or retry == 'o': continue else: break if device_type == "unknown": print(f"[!] Type d'appareil inconnu. Description: {sysdescr}") force_type = input("Forcer le type ? (mikrotik/omada/none) : ").lower() if force_type in ["mikrotik", "omada"]: device_type = force_type else: print("Annulé.") break print(f"[+] Appareil détecté: {device_type}") # 2. Nom device_name = await get_device_name(ip, community, device_type) # Nettoyage du nom pour qu'il soit un key YAML safe device_name = device_name.replace(" ", "_").lower() # 3. Découverte Interfaces interfaces = await discover_interfaces(ip, community) # 4. Découverte Métriques system_metrics = await discover_metrics(ip, community, device_type) # 5. Display & Sélection catalog_items = display_catalog(device_name, interfaces, system_metrics, device_type) if not catalog_items: choice = input("Continuer avec un autre appareil ? (o/n) : ") if choice.lower() != 'o' and choice.lower() != 'y': break continue max_idx = len(catalog_items) selection_str = input(f"\nSélection (1-{max_idx}, ranges, 'all', 'none') : ") selected_indices = parse_selection(selection_str, max_idx) if not selected_indices: print("Aucune sélection.") choice = input("Continuer avec un autre appareil ? (o/n) : ") if choice.lower() != 'o' and choice.lower() != 'y': break continue # Filtrer les items sélectionnés selected_interfaces = [] selected_metrics = [] for idx in selected_indices: # Attention, indices start at 1 item = catalog_items[idx-1] if item["type"] == "interface": selected_interfaces.append(item) else: selected_metrics.append(item) # Construction du device yaml device_oids = build_device_oids(selected_interfaces, selected_metrics) # 6. Backup & Save print(f"\n--- Mise à jour de la configuration ---") if backup_config(config_path): success = add_device_to_config(config_path, device_name, ip, community, device_oids) if success: pass # Already printed success message else: print("[!] Veuillez restaurer manuellement depuis le backup si nécessaire.") else: print("[!] Annulé car backup impossible.") # 7. Continuer ? cont = input("\nAjouter un autre appareil ? (o/n) : ") if cont.lower() != 'o' and cont.lower() != 'y': break def main(): args = parse_arguments() asyncio.run(process_device_loop(args.config)) if __name__ == "__main__": main()