Checks DNS server availability via ping then performs configurable DNS query (record_type: A, AAAA, TXT, MX, etc.). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
69 lines
2.2 KiB
Python
69 lines
2.2 KiB
Python
import time
|
|
|
|
import dns.resolver
|
|
|
|
from .base import BaseChecker, CheckResult
|
|
from .ping import PingChecker
|
|
|
|
|
|
class DnsChecker(BaseChecker):
|
|
def check(self) -> CheckResult:
|
|
host = self.config["host"]
|
|
query_name = self.config["query"]
|
|
query_type = self.config.get("record_type", "A")
|
|
timeout = self.config.get("timeout", 5)
|
|
|
|
# First check if host is reachable via ping
|
|
ping_checker = PingChecker(self.name, {"host": host, "timeout": timeout})
|
|
ping_result = ping_checker.check()
|
|
|
|
if not ping_result.success:
|
|
return CheckResult(
|
|
success=False,
|
|
message=f"DNS server unreachable: {ping_result.message}",
|
|
response_time=None
|
|
)
|
|
|
|
# Now perform DNS query
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = [host]
|
|
resolver.timeout = timeout
|
|
resolver.lifetime = timeout
|
|
|
|
start = time.time()
|
|
try:
|
|
answers = resolver.resolve(query_name, query_type)
|
|
response_time = (time.time() - start) * 1000 # ms
|
|
|
|
records = [str(rdata) for rdata in answers]
|
|
return CheckResult(
|
|
success=True,
|
|
message=f"DNS {query_type} query OK ({len(records)} record(s))",
|
|
response_time=response_time,
|
|
details={"records": records, "query": query_name, "type": query_type}
|
|
)
|
|
except dns.resolver.NXDOMAIN:
|
|
return CheckResult(
|
|
success=False,
|
|
message=f"DNS query failed: {query_name} does not exist",
|
|
response_time=None
|
|
)
|
|
except dns.resolver.NoAnswer:
|
|
return CheckResult(
|
|
success=False,
|
|
message=f"DNS query failed: no {query_type} record for {query_name}",
|
|
response_time=None
|
|
)
|
|
except dns.resolver.Timeout:
|
|
return CheckResult(
|
|
success=False,
|
|
message="DNS query timeout",
|
|
response_time=None
|
|
)
|
|
except Exception as e:
|
|
return CheckResult(
|
|
success=False,
|
|
message=f"DNS error: {e}",
|
|
response_time=None
|
|
)
|