fix: switching to histong official api wrapper for fixing 403 result

This commit is contained in:
2026-04-19 23:34:03 +02:00
parent 1fce266bd4
commit 42a8f0fe98
2 changed files with 174 additions and 139 deletions

View File

@@ -1,13 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Hostinger DNS Sync - Watches Docker/Traefik events and auto-manages DNS records Hostinger DNS Sync — watches Docker/Traefik events and auto-manages DNS records
using the official hostinger_api Python SDK.
""" """
import os import os
import time
import logging
import re import re
import logging
import requests import requests
import hostinger_api
from hostinger_api.rest import ApiException
import docker import docker
from typing import Optional from typing import Optional
@@ -18,32 +20,38 @@ logging.basicConfig(
) )
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# ── Config from env ──────────────────────────────────────────────────────────── # ── Config ─────────────────────────────────────────────────────────────────────
HOSTINGER_API_KEY = os.environ["HOSTINGER_API_KEY"] HOSTINGER_API_KEY = os.environ["HOSTINGER_API_KEY"]
DOMAIN = os.environ["DOMAIN"] # e.g. example.com DOMAIN = os.environ["DOMAIN"] # e.g. example.com
PUBLIC_IP = os.environ.get("PUBLIC_IP", "") # override; auto-detected if empty PUBLIC_IP = os.environ.get("PUBLIC_IP", "") # auto-detected if empty
RECORD_TYPE = os.environ.get("RECORD_TYPE", "A") RECORD_TYPE = os.environ.get("RECORD_TYPE", "A")
TTL = int(os.environ.get("TTL", "3600")) TTL = int(os.environ.get("TTL", "3600"))
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) # seconds, for label polling DELETE_ORPHANS = os.environ.get("DELETE_ORPHANS", "false").lower() == "true"
TRAEFIK_LABEL = os.environ.get("TRAEFIK_LABEL", "traefik.http.routers.*.rule")
DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true" DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true"
HOSTINGER_BASE = "https://api.hostinger.com/v1" # ── SDK client setup ───────────────────────────────────────────────────────────
HEADERS = { configuration = hostinger_api.Configuration(
"Authorization": f"Bearer {HOSTINGER_API_KEY}", access_token=HOSTINGER_API_KEY
"Content-Type": "application/json", )
}
# ── Helpers ────────────────────────────────────────────────────────────────────
def dns_client() -> hostinger_api.DNSZoneApi:
client = hostinger_api.ApiClient(configuration)
return hostinger_api.DNSZoneApi(client)
# ── Public IP detection ────────────────────────────────────────────────────────
def get_public_ip() -> str: def get_public_ip() -> str:
"""Auto-detect public IP if not set."""
if PUBLIC_IP: if PUBLIC_IP:
return PUBLIC_IP return PUBLIC_IP
for url in ["https://api.ipify.org", "https://checkip.amazonaws.com", "https://ifconfig.me"]: for url in [
"https://api.ipify.org",
"https://checkip.amazonaws.com",
"https://ifconfig.me",
]:
try: try:
r = requests.get(url, timeout=5) ip = requests.get(url, timeout=5).text.strip()
ip = r.text.strip()
log.info(f"Auto-detected public IP: {ip}") log.info(f"Auto-detected public IP: {ip}")
return ip return ip
except Exception: except Exception:
@@ -51,16 +59,15 @@ def get_public_ip() -> str:
raise RuntimeError("Could not auto-detect public IP. Set PUBLIC_IP env var.") raise RuntimeError("Could not auto-detect public IP. Set PUBLIC_IP env var.")
# ── Traefik label parsing ──────────────────────────────────────────────────────
def extract_hosts_from_rule(rule: str) -> list[str]: def extract_hosts_from_rule(rule: str) -> list[str]:
""" """Parse Host(`foo.example.com`) from a Traefik router rule."""
Parse Traefik router rule and extract hostnames. return re.findall(r"Host\(`([^`]+)`\)", rule)
Handles: Host(`foo.example.com`) and Host(`a.x.com`, `b.x.com`)
"""
return re.findall(r'Host\(`([^`]+)`\)', rule)
def get_subdomain(host: str) -> Optional[str]: def get_subdomain(host: str) -> Optional[str]:
"""Return subdomain part if host belongs to DOMAIN, else None.""" """Return the subdomain name for this domain, or None if unrelated."""
if host == DOMAIN: if host == DOMAIN:
return "@" return "@"
if host.endswith(f".{DOMAIN}"): if host.endswith(f".{DOMAIN}"):
@@ -68,147 +75,174 @@ def get_subdomain(host: str) -> Optional[str]:
return None return None
# ── Hostinger API ────────────────────────────────────────────────────────────── def get_desired_subdomains(docker_client: docker.DockerClient) -> dict[str, str]:
"""Scan running containers → return {subdomain_name: full_hostname}."""
def list_dns_records() -> list[dict]: result = {}
url = f"{HOSTINGER_BASE}/dns/zone/{DOMAIN}/records" for container in docker_client.containers.list():
r = requests.get(url, headers=HEADERS, timeout=10)
r.raise_for_status()
return r.json().get("data", [])
def find_record(records: list[dict], name: str) -> Optional[dict]:
for rec in records:
if rec.get("name") == name and rec.get("type") == RECORD_TYPE:
return rec
return None
def create_record(name: str, ip: str):
if DRY_RUN:
log.info(f"[DRY RUN] Would CREATE {RECORD_TYPE} {name}.{DOMAIN} -> {ip}")
return
url = f"{HOSTINGER_BASE}/dns/zone/{DOMAIN}/records"
payload = {"type": RECORD_TYPE, "name": name, "content": ip, "ttl": TTL}
r = requests.post(url, headers=HEADERS, json=payload, timeout=10)
r.raise_for_status()
log.info(f"✅ CREATED {RECORD_TYPE} {name}.{DOMAIN} -> {ip}")
def update_record(record_id: int, name: str, ip: str):
if DRY_RUN:
log.info(f"[DRY RUN] Would UPDATE {RECORD_TYPE} {name}.{DOMAIN} -> {ip} (id={record_id})")
return
url = f"{HOSTINGER_BASE}/dns/zone/{DOMAIN}/records/{record_id}"
payload = {"type": RECORD_TYPE, "name": name, "content": ip, "ttl": TTL}
r = requests.put(url, headers=HEADERS, json=payload, timeout=10)
r.raise_for_status()
log.info(f"✅ UPDATED {RECORD_TYPE} {name}.{DOMAIN} -> {ip}")
def delete_record(record_id: int, name: str):
if DRY_RUN:
log.info(f"[DRY RUN] Would DELETE {RECORD_TYPE} {name}.{DOMAIN} (id={record_id})")
return
url = f"{HOSTINGER_BASE}/dns/zone/{DOMAIN}/records/{record_id}"
r = requests.delete(url, headers=HEADERS, timeout=10)
r.raise_for_status()
log.info(f"🗑️ DELETED {RECORD_TYPE} {name}.{DOMAIN}")
def upsert_record(name: str, ip: str, existing_records: list[dict]):
rec = find_record(existing_records, name)
if rec:
if rec.get("content") == ip:
log.debug(f"⏭️ {name}.{DOMAIN} already points to {ip}, skipping")
else:
update_record(rec["id"], name, ip)
else:
create_record(name, ip)
# ── Docker watcher ─────────────────────────────────────────────────────────────
def get_traefik_hosts_from_containers(client: docker.DockerClient) -> set[str]:
"""Scan all running containers for Traefik host rules."""
hosts = set()
for container in client.containers.list():
labels = container.labels or {} labels = container.labels or {}
for key, value in labels.items(): for key, value in labels.items():
if re.match(r"traefik\.http\.routers\.[^.]+\.rule", key): if re.match(r"traefik\.http\.routers\.[^.]+\.rule", key):
for host in extract_hosts_from_rule(value): for host in extract_hosts_from_rule(value):
hosts.add(host)
return hosts
def sync(client: docker.DockerClient, ip: str):
"""Main sync: compare desired hosts with current DNS, upsert/remove as needed."""
desired_hosts = get_traefik_hosts_from_containers(client)
desired_subdomains = {}
for host in desired_hosts:
sub = get_subdomain(host) sub = get_subdomain(host)
if sub: if sub:
desired_subdomains[sub] = host result[sub] = host
else: else:
log.debug(f"Skipping {host} — not under {DOMAIN}") log.debug(f"Skipping {host} — not under {DOMAIN}")
return result
if not desired_subdomains:
log.info("No matching Traefik hosts found for this domain.") # ── DNS sync logic ─────────────────────────────────────────────────────────────
def fetch_current_records() -> list[dict]:
"""Return all DNS records for the domain as plain dicts."""
api = dns_client()
try:
records = api.get_dns_records_v1(DOMAIN)
# SDK returns a list of DNSV1ZoneRecordResource objects
return [r.to_dict() for r in records]
except ApiException as e:
log.error(f"Failed to fetch DNS records: {e}")
raise
def build_updated_zone(
current_records: list[dict],
desired: dict[str, str],
ip: str,
) -> list[dict]:
"""
Merge current zone with desired state.
The Hostinger API uses a full-zone PUT — we must include ALL records we want
to keep, not just the ones we're changing.
"""
# Index existing target-type records by name
existing_by_name = {
r["name"]: r
for r in current_records
if r.get("type") == RECORD_TYPE
}
# Records of other types we must always preserve
other_records = [r for r in current_records if r.get("type") != RECORD_TYPE]
updated_a_records = []
# Upsert desired records
for name in desired:
if name in existing_by_name:
if existing_by_name[name].get("content") == ip:
log.debug(f"{name}.{DOMAIN}{ip} (no change)")
else:
log.info(f"✏️ UPDATE {RECORD_TYPE} {name}.{DOMAIN}{ip}")
else:
log.info(f" CREATE {RECORD_TYPE} {name}.{DOMAIN}{ip}")
updated_a_records.append({"type": RECORD_TYPE, "name": name, "content": ip, "ttl": TTL})
# Handle records not in desired set
for name, rec in existing_by_name.items():
if name not in desired:
if DELETE_ORPHANS:
log.info(f"🗑️ DELETE orphan {RECORD_TYPE} {name}.{DOMAIN}")
# Omitting it from the list removes it on PUT
else:
log.debug(f"⏭ Keeping unmanaged record {name}.{DOMAIN}")
updated_a_records.append(rec)
return other_records + updated_a_records
def apply_zone(full_zone: list[dict]):
"""Send the full updated zone to Hostinger via PUT (full-zone replacement)."""
api = dns_client()
# Group records by (type, name) as the SDK expects
# DNSV1ZoneUpdateRequest.zone is a list of "name groups"
from collections import defaultdict
grouped: dict[tuple, list] = defaultdict(list)
for rec in full_zone:
key = (rec["type"], rec["name"])
grouped[key].append(rec)
zone_entries = []
for (rtype, rname), recs in grouped.items():
sdk_records = [
hostinger_api.DNSV1ZoneUpdateRequestZoneInnerRecordsInner(
content=r["content"],
ttl=r.get("ttl", TTL),
)
for r in recs
]
zone_entries.append(
hostinger_api.DNSV1ZoneUpdateRequestZoneInner(
type=rtype,
name=rname,
records=sdk_records,
)
)
update_request = hostinger_api.DNSV1ZoneUpdateRequest(zone=zone_entries)
if DRY_RUN:
log.info(f"[DRY RUN] Would PUT {len(zone_entries)} record group(s) to {DOMAIN}")
return return
log.info(f"Desired subdomains: {list(desired_subdomains.keys())}") try:
api.update_dns_records_v1(DOMAIN, update_request)
existing_records = list_dns_records() log.info(f"✅ Zone updated — {len(zone_entries)} record group(s)")
except ApiException as e:
# Upsert desired log.error(f"Failed to update DNS zone: {e}")
for name in desired_subdomains: raise
upsert_record(name, ip, existing_records)
# Optionally delete records no longer in use (opt-in via env)
if os.environ.get("DELETE_ORPHANS", "false").lower() == "true":
managed = set(desired_subdomains.keys())
for rec in existing_records:
if rec.get("type") == RECORD_TYPE and rec.get("name") not in managed:
log.info(f"Orphan record found: {rec['name']}.{DOMAIN}")
delete_record(rec["id"], rec["name"])
# ── Event-driven loop ────────────────────────────────────────────────────────── def sync(docker_client: docker.DockerClient, ip: str):
desired = get_desired_subdomains(docker_client)
def watch_events(client: docker.DockerClient, ip: str): if not desired:
"""React to Docker start/stop/die events immediately.""" log.info("No Traefik hosts found for this domain — nothing to sync.")
return
log.info(f"Desired subdomains: {list(desired.keys())}")
current_records = fetch_current_records()
new_zone = build_updated_zone(current_records, desired, ip)
apply_zone(new_zone)
# ── Docker event loop ──────────────────────────────────────────────────────────
def watch_events(docker_client: docker.DockerClient, ip: str):
log.info("👂 Listening for Docker events...") log.info("👂 Listening for Docker events...")
for event in client.events(decode=True, filters={"type": "container"}): for event in docker_client.events(decode=True, filters={"type": "container"}):
status = event.get("status", "") status = event.get("status", "")
if status in ("start", "die", "stop", "destroy"): if status in ("start", "die", "stop", "destroy"):
container_name = event.get("Actor", {}).get("Attributes", {}).get("name", "?") name = event.get("Actor", {}).get("Attributes", {}).get("name", "?")
log.info(f"⚡ Event: {status} on {container_name} — triggering DNS sync") log.info(f"⚡ Event [{status}] on container '{name}' — syncing DNS...")
try: try:
sync(client, ip) sync(docker_client, ip)
except Exception as e: except Exception as e:
log.error(f"Sync failed: {e}") log.error(f"Sync error: {e}")
# ── Entrypoint ──────────────────────────────────────────────────────────────── # ── Entry point ────────────────────────────────────────────────────────────────
def main(): def main():
log.info(f"🚀 Hostinger DNS Sync starting — domain: {DOMAIN}, TTL: {TTL}s") log.info(
if DRY_RUN: f"🚀 Hostinger DNS Sync starting — "
log.warning("⚠️ DRY RUN mode — no changes will be made") f"domain={DOMAIN} ttl={TTL}s "
f"delete_orphans={DELETE_ORPHANS} dry_run={DRY_RUN}"
)
ip = get_public_ip() ip = get_public_ip()
client = docker.from_env() docker_client = docker.from_env()
# Initial sync on startup log.info("🔄 Initial sync on startup...")
log.info("🔄 Initial sync...")
try: try:
sync(client, ip) sync(docker_client, ip)
except Exception as e: except Exception as e:
log.error(f"Initial sync failed: {e}") log.error(f"Initial sync failed: {e}")
# Watch Docker events (blocking)
try: try:
watch_events(client, ip) watch_events(docker_client, ip)
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("Shutting down.") log.info("Shutting down.")

View File

@@ -1,2 +1,3 @@
hostinger_api
docker==7.1.0 docker==7.1.0
requests==2.32.3 requests==2.32.3