#!/usr/bin/env python3 """ Hostinger DNS Sync — watches Docker/Traefik events and auto-manages DNS records using the official hostinger_api Python SDK. The Hostinger API returns records in this shape: [{ "name": "sub", "type": "A", "ttl": 3600, "records": [{"content": "1.2.3.4"}] }] For the PUT update, each zone entry has the same shape with an "overwrite" flag. """ import os import re import json import logging import requests import hostinger_api from hostinger_api.rest import ApiException from collections import defaultdict from pathlib import Path from typing import Optional import docker logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger(__name__) # ── Config ───────────────────────────────────────────────────────────────────── HOSTINGER_API_KEY = os.environ["HOSTINGER_API_KEY"] DOMAIN = os.environ["DOMAIN"] PUBLIC_IP = os.environ.get("PUBLIC_IP", "") RECORD_TYPE = os.environ.get("RECORD_TYPE", "A") TTL = int(os.environ.get("TTL", "3600")) DELETE_ORPHANS = os.environ.get("DELETE_ORPHANS", "false").lower() == "true" DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true" # Only subdomains that this tool has ever CREATED are eligible for deletion. STATE_FILE = Path(os.environ.get("STATE_FILE", "/data/managed_records.json")) # ── SDK client ───────────────────────────────────────────────────────────────── configuration = hostinger_api.Configuration(access_token=HOSTINGER_API_KEY) def dns_client() -> hostinger_api.DNSZoneApi: return hostinger_api.DNSZoneApi(hostinger_api.ApiClient(configuration)) # ── State persistence ────────────────────────────────────────────────────────── def load_managed() -> set[str]: try: return set(json.loads(STATE_FILE.read_text())) except Exception: return set() def save_managed(names: set[str]): STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(sorted(names))) # ── Public IP ────────────────────────────────────────────────────────────────── def get_public_ip() -> str: if PUBLIC_IP: return PUBLIC_IP for url in ["https://api.ipify.org", "https://checkip.amazonaws.com", "https://ifconfig.me"]: try: ip = requests.get(url, timeout=5).text.strip() log.info(f"Auto-detected public IP: {ip}") return ip except Exception: continue raise RuntimeError("Could not auto-detect public IP. Set PUBLIC_IP env var.") # ── Docker / Traefik scanning ────────────────────────────────────────────────── def extract_hosts_from_rule(rule: str) -> list[str]: return re.findall(r"Host\(`([^`]+)`\)", rule) def get_subdomain(host: str) -> Optional[str]: if host == DOMAIN: return "@" if host.endswith(f".{DOMAIN}"): return host[: -(len(DOMAIN) + 1)] return None def get_desired_subdomains(docker_client: docker.DockerClient) -> dict[str, str]: """ Scan ALL containers (running + stopped) for Traefik router rules. Returns {subdomain_name: full_hostname}. Scanning stopped containers avoids deleting records for briefly-restarting services. """ result = {} for container in docker_client.containers.list(all=True): labels = container.labels or {} for key, value in labels.items(): if re.match(r"traefik\.http\.routers\.[^.]+\.rule", key): for host in extract_hosts_from_rule(value): sub = get_subdomain(host) if sub: result[sub] = host else: log.debug(f"Skipping {host} — not under {DOMAIN}") return result # ── Hostinger DNS ────────────────────────────────────────────────────────────── def fetch_current_records() -> list[dict]: """ Returns records as plain dicts. The SDK structure per record is: { "name": "sub", "type": "A", "ttl": 3600, "records": [{"content": "1.2.3.4"}] } """ try: raw = dns_client().get_dns_records_v1(DOMAIN) result = [] for r in raw: d = r.to_dict() # Log full structure on first record to help diagnose schema issues if not result: log.debug(f"SDK record sample: {d}") result.append(d) return result except ApiException as e: log.error(f"Failed to fetch DNS records: {e}") raise def get_record_ip(rec: dict) -> Optional[str]: """ Safely extract the IP from a record dict regardless of SDK nesting. Handles both flat {"content": "..."} and nested {"records": [{"content": "..."}]}. """ # Nested form: {"records": [{"content": "1.2.3.4"}]} if "records" in rec and isinstance(rec["records"], list) and rec["records"]: first = rec["records"][0] if isinstance(first, dict): return first.get("content") # SDK may return objects instead of dicts if hasattr(first, "content"): return first.content # Flat form: {"content": "1.2.3.4"} return rec.get("content") def build_zone_entry(name: str, ip: str) -> hostinger_api.DNSV1ZoneUpdateRequestZoneInner: return hostinger_api.DNSV1ZoneUpdateRequestZoneInner( type=RECORD_TYPE, name=name, records=[ hostinger_api.DNSV1ZoneUpdateRequestZoneInnerRecordsInner( content=ip, ttl=TTL, ) ], ) def rebuild_zone_entry_from_existing(rec: dict) -> hostinger_api.DNSV1ZoneUpdateRequestZoneInner: """Re-encode an existing record (any type) back into an SDK update object.""" # Extract the list of sub-records sub_records = [] raw_recs = rec.get("records", []) if raw_recs: for r in raw_recs: if isinstance(r, dict): sub_records.append( hostinger_api.DNSV1ZoneUpdateRequestZoneInnerRecordsInner( content=r.get("content", ""), ttl=r.get("ttl", rec.get("ttl", TTL)), ) ) elif hasattr(r, "content"): sub_records.append( hostinger_api.DNSV1ZoneUpdateRequestZoneInnerRecordsInner( content=r.content, ttl=getattr(r, "ttl", TTL), ) ) else: # Flat record (shouldn't happen with real SDK but be safe) content = rec.get("content", "") if content: sub_records.append( hostinger_api.DNSV1ZoneUpdateRequestZoneInnerRecordsInner( content=content, ttl=rec.get("ttl", TTL), ) ) return hostinger_api.DNSV1ZoneUpdateRequestZoneInner( type=rec["type"], name=rec["name"], records=sub_records, ) def sync(docker_client: docker.DockerClient, ip: str): desired = get_desired_subdomains(docker_client) if not desired: log.info("No Traefik hosts found for this domain — nothing to sync.") return log.info(f"Desired subdomains: {sorted(desired.keys())}") managed = load_managed() current_records = fetch_current_records() # Index existing records of our type by name existing_ours = { r["name"]: r for r in current_records if r.get("type") == RECORD_TYPE } # All other record types — always preserved verbatim other_records = [r for r in current_records if r.get("type") != RECORD_TYPE] zone_entries = [] new_managed = set(managed) # ── Upsert desired records ────────────────────────────────────────────── for name in desired: if name in existing_ours: current_ip = get_record_ip(existing_ours[name]) if current_ip == ip: log.debug(f"⏭ {name}.{DOMAIN} → {ip} (unchanged)") else: log.info(f"✏️ UPDATE {RECORD_TYPE} {name}.{DOMAIN}: {current_ip} → {ip}") else: log.info(f"➕ CREATE {RECORD_TYPE} {name}.{DOMAIN} → {ip}") zone_entries.append(build_zone_entry(name, ip)) new_managed.add(name) # ── Handle existing records NOT in desired ────────────────────────────── for name, rec in existing_ours.items(): if name in desired: continue if DELETE_ORPHANS and name in managed: # Safe: this tool created it and it's no longer needed log.info(f"🗑️ DELETE orphan {RECORD_TYPE} {name}.{DOMAIN}") new_managed.discard(name) # Omitted from zone_entries → removed by PUT else: if DELETE_ORPHANS and name not in managed: log.warning( f"⚠️ KEEPING {name}.{DOMAIN} — not in managed set " f"(was created manually). " f"To allow deletion, add it to {STATE_FILE}." ) else: log.debug(f"⏭ Keeping {name}.{DOMAIN}") zone_entries.append(rebuild_zone_entry_from_existing(rec)) # ── Always re-include all other record types ──────────────────────────── for rec in other_records: zone_entries.append(rebuild_zone_entry_from_existing(rec)) # ── Apply ─────────────────────────────────────────────────────────────── update_request = hostinger_api.DNSV1ZoneUpdateRequest(zone=zone_entries) if DRY_RUN: log.info(f"[DRY RUN] Would PUT {len(zone_entries)} record group(s) to {DOMAIN}") return try: dns_client().update_dns_records_v1(DOMAIN, update_request) log.info(f"✅ Zone updated — {len(zone_entries)} record group(s)") except ApiException as e: log.error(f"Failed to apply zone update: {e}") return if new_managed != managed: save_managed(new_managed) log.debug(f"Managed state saved: {sorted(new_managed)}") # ── Docker event loop ────────────────────────────────────────────────────────── def watch_events(docker_client: docker.DockerClient, ip: str): log.info("👂 Listening for Docker events...") for event in docker_client.events(decode=True, filters={"type": "container"}): status = event.get("status", "") if status in ("start", "die", "stop", "destroy"): name = event.get("Actor", {}).get("Attributes", {}).get("name", "?") log.info(f"⚡ Event [{status}] on '{name}' — syncing DNS...") try: sync(docker_client, ip) except Exception as e: log.error(f"Sync error: {e}", exc_info=True) # ── Entry point ──────────────────────────────────────────────────────────────── def main(): log.info( f"🚀 Hostinger DNS Sync — domain={DOMAIN} ttl={TTL}s " f"delete_orphans={DELETE_ORPHANS} dry_run={DRY_RUN}" ) if DELETE_ORPHANS: log.info( f"🛡 Orphan deletion ON — only records in {STATE_FILE} " f"(created by this tool) are ever deleted." ) ip = get_public_ip() docker_client = docker.from_env() log.info("🔄 Initial sync...") try: sync(docker_client, ip) except Exception as e: log.error(f"Initial sync failed: {e}", exc_info=True) try: watch_events(docker_client, ip) except KeyboardInterrupt: log.info("Shutting down.") if __name__ == "__main__": main()