feat: Ajoute snmp-discover.py pour la découverte semi-guidée des OIDs
This commit is contained in:
557
snmp-discover.py
Normal file
557
snmp-discover.py
Normal file
@@ -0,0 +1,557 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
snmp-discover.py - Assistant de découverte SNMP semi-guidé pour snmp2mqtt.
|
||||
Détecte automatiquement les OIDs disponibles sur un appareil MikroTik ou TP-Link Omada
|
||||
et met à jour le fichier config.yaml.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import yaml
|
||||
import shutil
|
||||
import os
|
||||
import argparse
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
|
||||
from pysnmp.hlapi.asyncio import (
|
||||
SnmpEngine,
|
||||
CommunityData,
|
||||
UdpTransportTarget,
|
||||
ContextData,
|
||||
ObjectIdentity,
|
||||
ObjectType,
|
||||
get_cmd,
|
||||
walk_cmd,
|
||||
)
|
||||
|
||||
# Catalogue des OIDs par type d'appareil
|
||||
DEVICE_CATALOGS = {
|
||||
"mikrotik": {
|
||||
"manufacturer": "MikroTik",
|
||||
"system_metrics": {
|
||||
"cpu_usage": {
|
||||
"oid": ".1.3.6.1.4.1.14988.1.1.3.14.0",
|
||||
"name_template": "cpu_usage",
|
||||
"type": "int",
|
||||
"HA_device_class": "power_factor",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "%",
|
||||
},
|
||||
"temperature": {
|
||||
"oid": ".1.3.6.1.4.1.14988.1.1.6.1.0",
|
||||
"name_template": "temperature",
|
||||
"type": "int",
|
||||
"operation": "value / 1000",
|
||||
"HA_device_class": "temperature",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "°C",
|
||||
},
|
||||
"memory_used": {
|
||||
"oid": ".1.3.6.1.4.1.14988.1.1.4.2.0",
|
||||
"name_template": "memory_used",
|
||||
"type": "int",
|
||||
"HA_device_class": "data_size",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "MB",
|
||||
# Note: MikroTik memory OIDs can vary by version, assuming bytes here requires operation or custom handling.
|
||||
# Standard HOST-RESOURCES-MIB is often used: hrMemorySize (.1.3.6.1.2.1.25.2.2.0)
|
||||
# But MikroTik has proprietary ones. Let's adjust operation if needed.
|
||||
# For now, we'll stick to a simple GET, user may need to tweak operation if values are weird.
|
||||
"operation": "value / 1000000",
|
||||
},
|
||||
"memory_free": {
|
||||
"oid": ".1.3.6.1.4.1.14988.1.1.4.3.0",
|
||||
"name_template": "memory_free",
|
||||
"type": "int",
|
||||
"HA_device_class": "data_size",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "MB",
|
||||
"operation": "value / 1000000",
|
||||
},
|
||||
},
|
||||
"interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2",
|
||||
},
|
||||
"omada": {
|
||||
"manufacturer": "TP-Link",
|
||||
"system_metrics": {
|
||||
"connected_clients": {
|
||||
"oid": ".1.3.6.1.4.1.11863.10.1.3.0",
|
||||
"name_template": "connected_clients",
|
||||
"type": "int",
|
||||
"HA_device_class": None,
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "clients",
|
||||
},
|
||||
},
|
||||
"interfaces_root_oid": ".1.3.6.1.2.1.2.2.1.2",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser(description='SNMP Discovery Tool for snmp2mqtt')
|
||||
parser.add_argument('--config', '-c', default='config.yaml',
|
||||
help='Path to YAML configuration file (default: config.yaml)')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
async def snmp_get(ip, community, oid_str):
|
||||
"""Récupère la valeur d'un OID spécifique via SNMP."""
|
||||
snmpEngine = SnmpEngine()
|
||||
try:
|
||||
errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
|
||||
snmpEngine,
|
||||
CommunityData(community),
|
||||
await UdpTransportTarget.create((ip, 161)),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity(oid_str))
|
||||
)
|
||||
|
||||
snmpEngine.close_dispatcher()
|
||||
|
||||
if errorIndication:
|
||||
return None, str(errorIndication)
|
||||
elif errorStatus:
|
||||
return None, f"{errorStatus.prettyPrint()}"
|
||||
else:
|
||||
for varBind in varBinds:
|
||||
return str(varBind[1]), None
|
||||
except Exception as e:
|
||||
return None, str(e)
|
||||
finally:
|
||||
snmpEngine.close_dispatcher()
|
||||
|
||||
|
||||
async def snmp_walk(ip, community, oid_prefix):
|
||||
"""Effectue un SNMP WALK sur une branche OID."""
|
||||
results = []
|
||||
snmpEngine = SnmpEngine()
|
||||
try:
|
||||
async for errorIndication, errorStatus, errorIndex, varBinds in walk_cmd(
|
||||
snmpEngine,
|
||||
CommunityData(community),
|
||||
await UdpTransportTarget.create((ip, 161)),
|
||||
ContextData(),
|
||||
ObjectType(ObjectIdentity(oid_prefix)),
|
||||
lexicographicMode=True
|
||||
):
|
||||
if errorIndication or errorStatus:
|
||||
break
|
||||
for varBind in varBinds:
|
||||
oid_str = str(varBind[0])
|
||||
# Le test startswith doit gérer les formats avec/sans le point initial
|
||||
# oid_prefix = ".1.3.6.1.2.1.2.2.1.2", oid_str = "1.3.6.1.2.1.2.2.1.2.1"
|
||||
oid_str_normalized = oid_str.lstrip(".")
|
||||
oid_prefix_normalized = oid_prefix.lstrip(".")
|
||||
if oid_str_normalized.startswith(oid_prefix_normalized):
|
||||
results.append((oid_str, str(varBind[1])))
|
||||
except Exception as e:
|
||||
print(f"Warning: SNMP walk error on {oid_prefix}: {e}")
|
||||
finally:
|
||||
snmpEngine.close_dispatcher()
|
||||
return results
|
||||
|
||||
|
||||
async def detect_device_type(ip, community):
|
||||
"""Détecte le type d'équipement via sysDescr."""
|
||||
# .1.3.6.1.2.1.1.1.0 = sysDescr
|
||||
value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.1.0")
|
||||
|
||||
if err:
|
||||
return None, None, err
|
||||
|
||||
desc_lower = value.lower()
|
||||
|
||||
device_type = "unknown"
|
||||
if "mikrotik" in desc_lower or "routeros" in desc_lower:
|
||||
device_type = "mikrotik"
|
||||
elif "eap" in desc_lower or "tp-link" in desc_lower or "omada" in desc_lower:
|
||||
device_type = "omada"
|
||||
|
||||
return device_type, value, None
|
||||
|
||||
|
||||
async def get_device_name(ip, community, fallback_type):
|
||||
"""Récupère sysName pour nommer l'appareil, sinon utilise un fallback."""
|
||||
# .1.3.6.1.2.1.1.5.0 = sysName
|
||||
value, err = await snmp_get(ip, community, ".1.3.6.1.2.1.1.5.0")
|
||||
|
||||
if value and not err:
|
||||
return value
|
||||
|
||||
# Fallback
|
||||
prefix = fallback_type if fallback_type != "unknown" else "device"
|
||||
sanitized_ip = ip.replace(".", "_")
|
||||
# Check if name exists in config to avoid duplicates (simple check)
|
||||
return f"{prefix}_{sanitized_ip}"
|
||||
|
||||
|
||||
async def discover_interfaces(ip, community) -> List[Dict[str, Any]]:
|
||||
"""Découvre les interfaces physiques via ifDescr."""
|
||||
# ifDescr OID base
|
||||
ifdescr_oid = ".1.3.6.1.2.1.2.2.1.2"
|
||||
|
||||
print(f" [+] Scanning interfaces...")
|
||||
raw_ifs = await snmp_walk(ip, community, ifdescr_oid)
|
||||
|
||||
print(f" [DEBUG] Found {len(raw_ifs)} interfaces via walk")
|
||||
|
||||
interfaces = []
|
||||
for oid_full, name in raw_ifs:
|
||||
# Extraction de l'index (dernière partie de l'OID)
|
||||
index = oid_full.split(".")[-1]
|
||||
|
||||
# Filtrage basique pour ne garder que les interfaces physiques
|
||||
name_lower = name.lower()
|
||||
if any(skip in name_lower for skip in ["loopback", "null", "dummy"]):
|
||||
continue
|
||||
|
||||
# Détection des trucs un peu weird (parfois 'vlan', 'bridge' sont utiles, parfois non)
|
||||
# On garde 'ether', 'sfp', 'wlan', 'eth', 'port', 'bridge', 'lan', 'wan'
|
||||
if not any(k in name_lower for k in ["ether", "sfp", "wlan", "eth", "port", "bridge", "lan", "wan"]):
|
||||
# Si on est sur un Omada ou switch, on pourrait avoir des noms génériques
|
||||
# On reste permissif mais on exclut les trucs vides
|
||||
if name.strip() == "":
|
||||
continue
|
||||
|
||||
interface = {
|
||||
"index": index,
|
||||
"name": name,
|
||||
"oids": []
|
||||
}
|
||||
|
||||
# Construction des OID pour in, out, status
|
||||
# in: .10.index, out: .16.index, status: .8.index
|
||||
prefix = ".1.3.6.1.2.1.2.2.1"
|
||||
interface["oids"].append({
|
||||
"name": f"{name}_in",
|
||||
"oid": f"{prefix}.10.{index}",
|
||||
"type": "int",
|
||||
"HA_device_class": "data_size",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "bit",
|
||||
})
|
||||
interface["oids"].append({
|
||||
"name": f"{name}_out",
|
||||
"oid": f"{prefix}.16.{index}",
|
||||
"type": "int",
|
||||
"HA_device_class": "data_size",
|
||||
"HA_platform": "sensor",
|
||||
"HA_unit": "bit",
|
||||
})
|
||||
interface["oids"].append({
|
||||
"name": f"{name}_status",
|
||||
"oid": f"{prefix}.8.{index}",
|
||||
"type": "int", # On peut mettre bool ensuite si on veut map 1->ON
|
||||
"HA_device_class": "connectivity",
|
||||
"HA_platform": "binary_sensor",
|
||||
})
|
||||
|
||||
interfaces.append(interface)
|
||||
|
||||
print(f" [DEBUG] Returning {len(interfaces)} interfaces")
|
||||
return interfaces
|
||||
|
||||
|
||||
async def discover_metrics(ip, community, device_type) -> List[Dict[str, Any]]:
|
||||
"""Teste la disponibilité des OIDs systèmes spécifiques au fabricant."""
|
||||
catalog = DEVICE_CATALOGS.get(device_type, {}).get("system_metrics", {})
|
||||
available_metrics = []
|
||||
|
||||
if not catalog:
|
||||
return available_metrics
|
||||
|
||||
print(f" [+] Scanning system metrics ({device_type})...")
|
||||
|
||||
for key, meta in catalog.items():
|
||||
oid = meta.get("oid")
|
||||
if oid:
|
||||
val, err = await snmp_get(ip, community, oid)
|
||||
if val and not err:
|
||||
available_metrics.append({**meta, "key": key})
|
||||
else:
|
||||
print(f" - {key}: not available / error")
|
||||
|
||||
return available_metrics
|
||||
|
||||
|
||||
def display_catalog(device_name, interfaces, system_metrics, device_type):
|
||||
"""Affiche le menu interactif de sélection."""
|
||||
print(f"\n" + "="*60)
|
||||
print(f" Appareil détecté: {device_type.upper()}"
|
||||
f"\n Nom: {device_name}")
|
||||
print("="*60)
|
||||
|
||||
all_items = []
|
||||
|
||||
# 1. Interfaces
|
||||
if interfaces:
|
||||
print(f"\nInterfaces détectées ({len(interfaces)}):")
|
||||
for idx, iface in enumerate(interfaces):
|
||||
num = len(all_items) + 1
|
||||
label = f" [{num}] {iface['name']} (In/Out/Status)"
|
||||
print(label)
|
||||
all_items.append({"type": "interface", "data": iface, "label": label})
|
||||
|
||||
# 2. Métriques système
|
||||
if system_metrics:
|
||||
prefix_map = {
|
||||
"cpu": "CPU",
|
||||
"temp": "Température",
|
||||
"mem": "RAM"
|
||||
}
|
||||
print(f"\nMétriques Système:")
|
||||
for idx, met in enumerate(system_metrics):
|
||||
num = len(all_items) + 1
|
||||
# Essayez de deviner un label propre
|
||||
key_label = met['key'].replace('_', ' ').title()
|
||||
unit = met.get('HA_unit', '')
|
||||
op_label = ""
|
||||
if met.get('operation'):
|
||||
op_label = " (transformed)"
|
||||
label = f" [{num}] {key_label} ({unit}){op_label}"
|
||||
print(label)
|
||||
all_items.append({"type": "metric", "data": met, "label": label})
|
||||
|
||||
if not all_items:
|
||||
print("\nAucun OID intéressant trouvé.")
|
||||
return [], []
|
||||
|
||||
print("\n" + "-"*40)
|
||||
print("\nSélectionnez les métriques à surveiller :")
|
||||
return all_items
|
||||
|
||||
|
||||
def parse_selection(input_str: str, max_num: int) -> List[int]:
|
||||
"""Parse la sélection utilisateur : '1,3,5-8', 'all', etc."""
|
||||
indices = set()
|
||||
|
||||
if input_str.lower() == "all":
|
||||
return list(range(1, max_num + 1))
|
||||
elif input_str.lower() == "none":
|
||||
return []
|
||||
|
||||
parts = input_str.split(",")
|
||||
for part in parts:
|
||||
part = part.strip()
|
||||
if "-" in part:
|
||||
try:
|
||||
start, end = part.split("-", 1)
|
||||
s = int(start)
|
||||
e = int(end)
|
||||
for n in range(s, e + 1):
|
||||
if 1 <= n <= max_num:
|
||||
indices.add(n)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
n = int(part)
|
||||
if 1 <= n <= max_num:
|
||||
indices.add(n)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
return sorted(list(indices))
|
||||
|
||||
|
||||
def backup_config(config_path):
|
||||
"""Crée une copie de backup du fichier config."""
|
||||
# On suppose que le path est absolu ou relatif au cwd, on travaille avec le path tel quel
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
bak_path = f"{config_path}.bak.{timestamp}"
|
||||
|
||||
try:
|
||||
# Créer un backup même si le fichier n'existe pas n'a pas de sens,
|
||||
# mais s'il existe on le copie.
|
||||
if os.path.exists(config_path):
|
||||
shutil.copy2(config_path, bak_path)
|
||||
print(f"[Backup] Sauvegarde créée: {bak_path}")
|
||||
return True
|
||||
else:
|
||||
print(f"[Backup] Pas de fichier existant à sauvegarder, création du fichier.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[Erreur] Impossible de créer le backup: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def build_device_oids(interface_data, metrics_data):
|
||||
"""Construit la liste des objets OID pour le config."""
|
||||
oids = []
|
||||
|
||||
# Ajout des interfaces (groupées)
|
||||
for if_item in interface_data:
|
||||
for oid in if_item["data"]["oids"]:
|
||||
oids.append(oid)
|
||||
|
||||
# Ajout des métriques
|
||||
for met_item in metrics_data:
|
||||
oid_meta = met_item["data"]
|
||||
clean_meta = {
|
||||
"name": oid_meta.get("name_template", "metric"),
|
||||
"oid": oid_meta["oid"],
|
||||
"type": oid_meta["type"],
|
||||
"HA_platform": oid_meta.get("HA_platform"),
|
||||
}
|
||||
if oid_meta.get("HA_device_class"):
|
||||
clean_meta["HA_device_class"] = oid_meta["HA_device_class"]
|
||||
if oid_meta.get("HA_unit"):
|
||||
clean_meta["HA_unit"] = oid_meta["HA_unit"]
|
||||
if "operation" in oid_meta:
|
||||
clean_meta["operation"] = oid_meta["operation"]
|
||||
oids.append(clean_meta)
|
||||
|
||||
return oids
|
||||
|
||||
|
||||
def add_device_to_config(config_path, device_name, ip, community, device_oids):
|
||||
"""Ajoute le device au fichier YAML."""
|
||||
# Structure du nouveau device
|
||||
new_device = {
|
||||
"ip": ip,
|
||||
"snmp_community": community,
|
||||
"oids": device_oids
|
||||
}
|
||||
|
||||
config = {}
|
||||
if os.path.exists(config_path):
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
content = f.read()
|
||||
if content.strip():
|
||||
config = yaml.safe_load(content) or {}
|
||||
except yaml.YAMLError:
|
||||
# Si le fichier est corrompu, on ne veut pas perdre le backup fait avant
|
||||
print(f"[Erreur] Le fichier {config_path} semble invalide. Veuillez vérifier le backup.")
|
||||
return False
|
||||
|
||||
# Assurer la structure de base
|
||||
if 'mqtt' not in config:
|
||||
config['mqtt'] = {
|
||||
'broker': 'IP or FQDN',
|
||||
'port': 1883,
|
||||
'user': 'admin',
|
||||
'password': 'password'
|
||||
}
|
||||
|
||||
if 'devices' not in config:
|
||||
config['devices'] = {}
|
||||
|
||||
# Insertion
|
||||
config['devices'][device_name] = new_device
|
||||
|
||||
# Sauvegarde
|
||||
try:
|
||||
with open(config_path, 'w') as f:
|
||||
yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
||||
print(f"[-] Device '{device_name}' ajouté avec succès.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[Erreur] Echec de l'écriture du fichier config: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def process_device_loop(config_path):
|
||||
"""Gère le loop d'ajout de devices."""
|
||||
while True:
|
||||
print("\n--- Découverte d'un nouvel appareil ---")
|
||||
ip = input("Adresse IP de l'appareil : ").strip()
|
||||
if not ip:
|
||||
continue
|
||||
|
||||
community = input("Communauté SNMP (défaut: public) : ").strip() or "public"
|
||||
|
||||
# 1. Détection
|
||||
print(f"[*] Connexion à {ip}...")
|
||||
device_type, sysdescr, err = await detect_device_type(ip, community)
|
||||
|
||||
if err:
|
||||
print(f"[!] Erreur SNMP: {err}")
|
||||
retry = input("Réessayer ? (o/n) : ").lower()
|
||||
if retry == 'y' or retry == 'o':
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if device_type == "unknown":
|
||||
print(f"[!] Type d'appareil inconnu. Description: {sysdescr}")
|
||||
force_type = input("Forcer le type ? (mikrotik/omada/none) : ").lower()
|
||||
if force_type in ["mikrotik", "omada"]:
|
||||
device_type = force_type
|
||||
else:
|
||||
print("Annulé.")
|
||||
break
|
||||
|
||||
print(f"[+] Appareil détecté: {device_type}")
|
||||
|
||||
# 2. Nom
|
||||
device_name = await get_device_name(ip, community, device_type)
|
||||
# Nettoyage du nom pour qu'il soit un key YAML safe
|
||||
device_name = device_name.replace(" ", "_").lower()
|
||||
|
||||
# 3. Découverte Interfaces
|
||||
interfaces = await discover_interfaces(ip, community)
|
||||
|
||||
# 4. Découverte Métriques
|
||||
system_metrics = await discover_metrics(ip, community, device_type)
|
||||
|
||||
# 5. Display & Sélection
|
||||
catalog_items = display_catalog(device_name, interfaces, system_metrics, device_type)
|
||||
if not catalog_items:
|
||||
choice = input("Continuer avec un autre appareil ? (o/n) : ")
|
||||
if choice.lower() != 'o' and choice.lower() != 'y':
|
||||
break
|
||||
continue
|
||||
|
||||
max_idx = len(catalog_items)
|
||||
selection_str = input(f"\nSélection (1-{max_idx}, ranges, 'all', 'none') : ")
|
||||
selected_indices = parse_selection(selection_str, max_idx)
|
||||
|
||||
if not selected_indices:
|
||||
print("Aucune sélection.")
|
||||
choice = input("Continuer avec un autre appareil ? (o/n) : ")
|
||||
if choice.lower() != 'o' and choice.lower() != 'y':
|
||||
break
|
||||
continue
|
||||
|
||||
# Filtrer les items sélectionnés
|
||||
selected_interfaces = []
|
||||
selected_metrics = []
|
||||
|
||||
for idx in selected_indices:
|
||||
# Attention, indices start at 1
|
||||
item = catalog_items[idx-1]
|
||||
if item["type"] == "interface":
|
||||
selected_interfaces.append(item)
|
||||
else:
|
||||
selected_metrics.append(item)
|
||||
|
||||
# Construction du device yaml
|
||||
device_oids = build_device_oids(selected_interfaces, selected_metrics)
|
||||
|
||||
# 6. Backup & Save
|
||||
print(f"\n--- Mise à jour de la configuration ---")
|
||||
if backup_config(config_path):
|
||||
success = add_device_to_config(config_path, device_name, ip, community, device_oids)
|
||||
if success:
|
||||
pass # Already printed success message
|
||||
else:
|
||||
print("[!] Veuillez restaurer manuellement depuis le backup si nécessaire.")
|
||||
else:
|
||||
print("[!] Annulé car backup impossible.")
|
||||
|
||||
# 7. Continuer ?
|
||||
cont = input("\nAjouter un autre appareil ? (o/n) : ")
|
||||
if cont.lower() != 'o' and cont.lower() != 'y':
|
||||
break
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_arguments()
|
||||
asyncio.run(process_device_loop(args.config))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user