Initial commit: LAN Checker

Network health monitoring script with MQTT reporting for Home Assistant.
- Ping, HTTP, and SNMP checkers
- MQTT Discovery for automatic entity creation
- Configurable check intervals

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-26 16:22:55 +01:00
commit 02b14979bc
11 changed files with 536 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
config.yaml
__pycache__/
*.pyc
.venv/

39
CLAUDE.md Normal file
View File

@@ -0,0 +1,39 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
LAN Checker is a Python script that monitors network services and devices, publishing status via MQTT to Home Assistant using MQTT Discovery.
## Commands
```bash
# Install dependencies
pip install -r requirements.txt
# Setup config
cp config.yaml.example config.yaml
# Run
python lan_checker.py
```
## Architecture
- `lan_checker.py` - Main script: config loading, MQTT client, check scheduler
- `checkers/` - Modular check implementations
- `base.py` - `BaseChecker` abstract class and `CheckResult` dataclass
- `ping.py`, `http.py`, `snmp.py` - Concrete checker implementations
- `config.yaml.example` - Configuration template (copy to `config.yaml`)
## Adding a New Checker
1. Create `checkers/newtype.py` inheriting from `BaseChecker`
2. Implement `check()` method returning `CheckResult`
3. Register in `checkers/__init__.py` CHECKERS dict
## MQTT Topics
- Discovery: `homeassistant/binary_sensor/lan_checker_{id}/config`
- State: `lan_checker/{id}/state` (JSON with state, message, response_time, last_check)

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Antoine
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

9
checkers/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from .ping import PingChecker
from .http import HttpChecker
from .snmp import SnmpChecker
CHECKERS = {
"ping": PingChecker,
"http": HttpChecker,
"snmp": SnmpChecker,
}

21
checkers/base.py Normal file
View File

@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class CheckResult:
success: bool
message: str
response_time: float | None = None
details: dict[str, Any] | None = None
class BaseChecker(ABC):
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
@abstractmethod
def check(self) -> CheckResult:
pass

53
checkers/http.py Normal file
View File

@@ -0,0 +1,53 @@
import time
import requests
from .base import BaseChecker, CheckResult
class HttpChecker(BaseChecker):
def check(self) -> CheckResult:
url = self.config["url"]
method = self.config.get("method", "GET").upper()
timeout = self.config.get("timeout", 10)
expected_status = self.config.get("expected_status", 200)
verify_ssl = self.config.get("verify_ssl", True)
headers = self.config.get("headers", {})
start = time.time()
try:
response = requests.request(
method=method,
url=url,
timeout=timeout,
verify=verify_ssl,
headers=headers
)
response_time = (time.time() - start) * 1000 # ms
if response.status_code == expected_status:
return CheckResult(
success=True,
message=f"HTTP {response.status_code}",
response_time=response_time,
details={"status_code": response.status_code}
)
else:
return CheckResult(
success=False,
message=f"Unexpected status: {response.status_code} (expected {expected_status})",
response_time=response_time,
details={"status_code": response.status_code}
)
except requests.Timeout:
return CheckResult(
success=False,
message="HTTP timeout",
response_time=None
)
except requests.RequestException as e:
return CheckResult(
success=False,
message=f"HTTP error: {e}",
response_time=None
)

53
checkers/ping.py Normal file
View File

@@ -0,0 +1,53 @@
import subprocess
import time
import platform
from .base import BaseChecker, CheckResult
class PingChecker(BaseChecker):
def check(self) -> CheckResult:
host = self.config["host"]
count = self.config.get("count", 1)
timeout = self.config.get("timeout", 5)
# Adapt ping command for OS
if platform.system().lower() == "windows":
cmd = ["ping", "-n", str(count), "-w", str(timeout * 1000), host]
else:
cmd = ["ping", "-c", str(count), "-W", str(timeout), host]
start = time.time()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout + 5
)
response_time = (time.time() - start) * 1000 # ms
if result.returncode == 0:
return CheckResult(
success=True,
message="Host is reachable",
response_time=response_time
)
else:
return CheckResult(
success=False,
message="Host is unreachable",
response_time=None
)
except subprocess.TimeoutExpired:
return CheckResult(
success=False,
message="Ping timeout",
response_time=None
)
except Exception as e:
return CheckResult(
success=False,
message=f"Ping error: {e}",
response_time=None
)

62
checkers/snmp.py Normal file
View File

@@ -0,0 +1,62 @@
import time
from pysnmp.hlapi import (
SnmpEngine,
CommunityData,
UdpTransportTarget,
ContextData,
ObjectType,
ObjectIdentity,
getCmd,
)
from .base import BaseChecker, CheckResult
class SnmpChecker(BaseChecker):
def check(self) -> CheckResult:
host = self.config["host"]
port = self.config.get("port", 161)
community = self.config.get("community", "public")
oid = self.config.get("oid", "1.3.6.1.2.1.1.1.0") # sysDescr
timeout_val = self.config.get("timeout", 5)
start = time.time()
try:
iterator = getCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((host, port), timeout=timeout_val, retries=1),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
error_indication, error_status, error_index, var_binds = next(iterator)
response_time = (time.time() - start) * 1000 # ms
if error_indication:
return CheckResult(
success=False,
message=f"SNMP error: {error_indication}",
response_time=None
)
elif error_status:
return CheckResult(
success=False,
message=f"SNMP error: {error_status.prettyPrint()}",
response_time=None
)
else:
values = {str(oid): str(val) for oid, val in var_binds}
return CheckResult(
success=True,
message="SNMP response OK",
response_time=response_time,
details=values
)
except Exception as e:
return CheckResult(
success=False,
message=f"SNMP error: {e}",
response_time=None
)

52
config.yaml.example Normal file
View File

@@ -0,0 +1,52 @@
# LAN Checker Configuration
mqtt:
host: "192.168.1.100"
port: 1883
username: ""
password: ""
client_id: "lan_checker"
# Default interval between checks in seconds
default_interval: 60
checks:
# Ping checks
- id: router
name: "Router"
type: ping
host: "192.168.1.1"
interval: 30
timeout: 5
- id: nas
name: "NAS Synology"
type: ping
host: "192.168.1.50"
interval: 60
# HTTP checks
- id: homeassistant
name: "Home Assistant"
type: http
url: "http://192.168.1.100:8123"
expected_status: 200
timeout: 10
interval: 60
- id: plex
name: "Plex Server"
type: http
url: "http://192.168.1.50:32400/web"
expected_status: 200
verify_ssl: false
interval: 120
# SNMP checks
- id: switch
name: "Switch Managed"
type: snmp
host: "192.168.1.2"
community: "public"
oid: "1.3.6.1.2.1.1.1.0" # sysDescr
interval: 120

218
lan_checker.py Normal file
View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
LAN Checker - Network health monitoring with MQTT reporting for Home Assistant.
"""
import argparse
import json
import logging
import signal
import sys
import time
from pathlib import Path
import yaml
import paho.mqtt.client as mqtt
from checkers import CHECKERS
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class LanChecker:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.mqtt_client: mqtt.Client | None = None
self.running = False
self.checks = []
def _load_config(self, config_path: str) -> dict:
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
with open(path, "r") as f:
return yaml.safe_load(f)
def _setup_mqtt(self):
mqtt_config = self.config["mqtt"]
self.mqtt_client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id=mqtt_config.get("client_id", "lan_checker")
)
if mqtt_config.get("username"):
self.mqtt_client.username_pw_set(
mqtt_config["username"],
mqtt_config.get("password", "")
)
self.mqtt_client.on_connect = self._on_mqtt_connect
self.mqtt_client.on_disconnect = self._on_mqtt_disconnect
self.mqtt_client.connect(
mqtt_config["host"],
mqtt_config.get("port", 1883),
keepalive=60
)
self.mqtt_client.loop_start()
def _on_mqtt_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
logger.info("Connected to MQTT broker")
self._publish_discovery()
else:
logger.error(f"MQTT connection failed: {reason_code}")
def _on_mqtt_disconnect(self, client, userdata, flags, reason_code, properties):
logger.warning(f"Disconnected from MQTT broker: {reason_code}")
def _publish_discovery(self):
"""Publish MQTT Discovery messages for Home Assistant."""
for check in self.config["checks"]:
device_id = check["id"]
device_name = check["name"]
# Binary sensor for online/offline status
status_config = {
"name": f"{device_name} Status",
"unique_id": f"lan_checker_{device_id}_status",
"state_topic": f"lan_checker/{device_id}/state",
"value_template": "{{ value_json.state }}",
"payload_on": "online",
"payload_off": "offline",
"device_class": "connectivity",
"device": {
"identifiers": [f"lan_checker_{device_id}"],
"name": device_name,
"manufacturer": "LAN Checker",
},
}
self.mqtt_client.publish(
f"homeassistant/binary_sensor/lan_checker_{device_id}/config",
json.dumps(status_config),
retain=True
)
# Sensor for response time
latency_config = {
"name": f"{device_name} Latency",
"unique_id": f"lan_checker_{device_id}_latency",
"state_topic": f"lan_checker/{device_id}/state",
"value_template": "{{ value_json.response_time | default('unavailable') }}",
"unit_of_measurement": "ms",
"device_class": "duration",
"state_class": "measurement",
"device": {
"identifiers": [f"lan_checker_{device_id}"],
"name": device_name,
"manufacturer": "LAN Checker",
},
}
self.mqtt_client.publish(
f"homeassistant/sensor/lan_checker_{device_id}_latency/config",
json.dumps(latency_config),
retain=True
)
logger.info(f"Published discovery for: {device_name}")
def _setup_checks(self):
"""Initialize check instances from configuration."""
for check_config in self.config["checks"]:
check_type = check_config["type"]
if check_type not in CHECKERS:
logger.error(f"Unknown check type: {check_type}")
continue
checker_class = CHECKERS[check_type]
checker = checker_class(check_config["name"], check_config)
self.checks.append({
"id": check_config["id"],
"name": check_config["name"],
"checker": checker,
"interval": check_config.get("interval", self.config.get("default_interval", 60)),
"last_check": 0,
})
def _run_check(self, check: dict):
"""Execute a single check and publish results."""
result = check["checker"].check()
state = "online" if result.success else "offline"
payload = {
"state": state,
"message": result.message,
"response_time": round(result.response_time, 2) if result.response_time else None,
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
if result.details:
payload["details"] = result.details
topic = f"lan_checker/{check['id']}/state"
self.mqtt_client.publish(topic, json.dumps(payload), retain=True)
log_level = logging.INFO if result.success else logging.WARNING
logger.log(log_level, f"{check['name']}: {state} - {result.message}")
def run(self):
"""Main loop."""
self._setup_mqtt()
self._setup_checks()
self.running = True
logger.info(f"Starting LAN Checker with {len(self.checks)} checks")
while self.running:
current_time = time.time()
for check in self.checks:
if current_time - check["last_check"] >= check["interval"]:
self._run_check(check)
check["last_check"] = current_time
time.sleep(1)
def stop(self):
"""Stop the checker gracefully."""
logger.info("Stopping LAN Checker...")
self.running = False
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def main():
parser = argparse.ArgumentParser(description="LAN Checker - Network health monitoring")
parser.add_argument(
"-c", "--config",
default="config.yaml",
help="Path to configuration file (default: config.yaml)"
)
args = parser.parse_args()
checker = LanChecker(args.config)
def signal_handler(sig, frame):
checker.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
checker.run()
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
paho-mqtt>=2.0.0
PyYAML>=6.0
pysnmp>=4.4.12
requests>=2.31.0