Network health monitoring script with MQTT reporting for Home Assistant. - Ping, HTTP, and SNMP checkers - MQTT Discovery for automatic entity creation - Configurable check intervals Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
import subprocess
|
|
import time
|
|
import platform
|
|
|
|
from .base import BaseChecker, CheckResult
|
|
|
|
|
|
class PingChecker(BaseChecker):
|
|
def check(self) -> CheckResult:
|
|
host = self.config["host"]
|
|
count = self.config.get("count", 1)
|
|
timeout = self.config.get("timeout", 5)
|
|
|
|
# Adapt ping command for OS
|
|
if platform.system().lower() == "windows":
|
|
cmd = ["ping", "-n", str(count), "-w", str(timeout * 1000), host]
|
|
else:
|
|
cmd = ["ping", "-c", str(count), "-W", str(timeout), host]
|
|
|
|
start = time.time()
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout + 5
|
|
)
|
|
response_time = (time.time() - start) * 1000 # ms
|
|
|
|
if result.returncode == 0:
|
|
return CheckResult(
|
|
success=True,
|
|
message="Host is reachable",
|
|
response_time=response_time
|
|
)
|
|
else:
|
|
return CheckResult(
|
|
success=False,
|
|
message="Host is unreachable",
|
|
response_time=None
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return CheckResult(
|
|
success=False,
|
|
message="Ping timeout",
|
|
response_time=None
|
|
)
|
|
except Exception as e:
|
|
return CheckResult(
|
|
success=False,
|
|
message=f"Ping error: {e}",
|
|
response_time=None
|
|
)
|