From 859cffe9a0c8ecf615a643970737b927711c7e00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ga=C3=9Fner?= Date: Sun, 9 Jul 2023 18:52:12 +0200 Subject: [PATCH] Initial development commit --- .gitignore | 6 + README.md | 120 +++++++++++++++++++ dyndnshelper/__init__.py | 0 dyndnshelper/exceptions.py | 6 + dyndnshelper/logging.py | 31 +++++ dyndnshelper/main.py | 26 +++++ dyndnshelper/routers/__init__.py | 5 + dyndnshelper/routers/fqdns.py | 112 ++++++++++++++++++ dyndnshelper/settings.py | 192 ++++++++++++++++++++++++++++++ dyndnshelper/vendors/__init__.py | 21 ++++ dyndnshelper/vendors/_base.py | 195 +++++++++++++++++++++++++++++++ dyndnshelper/vendors/hetzner.py | 186 +++++++++++++++++++++++++++++ pyproject.toml | 32 +++++ 13 files changed, 932 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 dyndnshelper/__init__.py create mode 100644 dyndnshelper/exceptions.py create mode 100644 dyndnshelper/logging.py create mode 100644 dyndnshelper/main.py create mode 100644 dyndnshelper/routers/__init__.py create mode 100644 dyndnshelper/routers/fqdns.py create mode 100644 dyndnshelper/settings.py create mode 100644 dyndnshelper/vendors/__init__.py create mode 100644 dyndnshelper/vendors/_base.py create mode 100644 dyndnshelper/vendors/hetzner.py create mode 100644 pyproject.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..540b613 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +build +dist +venv +*.egg-info +*.pyc +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6365cc3 --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ +DynDNS Helper +============= + +Self-hosted python web application based on [FastAPI](https://github.com/tiangolo/fastapi) to update DNS records via API. + +Vendor backends are easily extensible and include currently: +- [Hetzner DNS API](https://dns.hetzner.com/api-docs/) + +Current use-cases for this application: +- Updating DNS records based on data received via DynDNS compatible requests from e.g. routers such as AVM Fritzbox + - Support for IPv4, IPv6 and IPv6 LAN Prefix + - IPv4 will result in updated A record + - IPv6 will result in updated AAAA record + - IPv6 LAN Prefix will update all AAAA records defined in an config mapping of FQDN to IPv6 Interface IDs + +Limitations: +- No support for DNS round robin or the like +- No updating a single FQDN across multiple vendors +- No cleanup of DNS records +- No updating data other than values of DNS records + +# TODO +- docs +- tests + +# Router DynDNS Examples +## AVM Fritzbox +Fritzbox DynDNS Settings: +``` +Update-URL: https://192.168.178.10/fqdns//dyndns?ip=&ip=&ipv6lanprefix=&dualstack= +Domainname: test1.domain.tld +Username: testuser +Password: testpassword +``` + +App config: +```json +{ + "fqdns": { + "test1.domain.tld": { + "vendor": "...", + "dyndns_credentials": { + "testuser": "9f735e0df9a1ddc702bf0a1a7b83033f9f7153a00c29de82cedadc9957289b05" + } + } + } +} +``` + +# IPv6 LAN Prefix & Interface ID (IID) +Providers will assign an publicly routed IPv6 Prefix for a client. Normally a `/48` or `/64`. + +Depending on the clients router configuration, devices behind it will use this prefix to assign themselves a public IPv6. + +These devices append an Interface ID (or: host part) to the prefix. How this IID is generated depends on the device configuration. + +It may be generated by using its MAC Address, it may be generated randomly for privacy reasons. + + + +This helper supports updating DNS records for IIDs when the IPv6 LAN Prefix changes: + +If a DynDNS API call contains an `ipv6lanprefix`, the FQDN settings of the given `fqdn` will be used to find other FQDNs to update, e.g.: +```json +{ + "fqdns": { + "test1.domain.tld": { + "vendor": "...", + "dyndns_credentials": { + "testuser": "9f735e0df9a1ddc702bf0a1a7b83033f9f7153a00c29de82cedadc9957289b05" + }, + "ipv6_lan_prefix_iid_map": { + "test2.domain.tld": "::2a0:00ff:111c:1234" + } + }, + "test2.domain.tld": { + "vendor": "..." + } + } +} +``` + +If for this configuration a DynDNS API call for `test1.domain.tld` is received, that contains `ipv6lanprefix`, an AAAA DNS record for `test2.domain.tld` is ensured. + +For the above example: An `ipv6lanprefix` of `2001:db8:85a3::/48` will result in: +``` +test2.domaintld AAAA 2001:db8:85a3::2a0:00ff:111c:1234 +``` + +## Caveats +- Larger IIDs than a prefix allows will be shortened + - e.g.: An `ipv6lanprefix` of `2001:db8:85a3:1234::/64` with an IID `::9876:2a0:00ff:111c:1234` will result in an IPv6 address `2001:db8:85a3:1234:2a0:00ff:111c:1234` + +# Vendors +All vendor config is defined under a key `vendors` and the lowercase name of the vendor, e.g.: +```json +{ + "vendors": { + "hetzner": { + "enabled": true, + ... + } + } +} +``` + +The following common options are available for every vendor: +| Option | Default | Required | Description | Example | +|:-------|:--------|:---------|:------------|:--------| +| enabled | false | false | if this vendor is enabled | true | +| create_zone_if_missing | false | false | if a zone is created for a FQDN if its missing - otherwise raises an error | true | +| default_zone_ttl | 86400 | false | default TTL to set for created zones | 7200 | +| default_record_ttl | 600 | false | default TTL to set for created records - if not defined in individual FQDN settings | 300 | + +## Hetzner +Additional options: +| Option | Default | Required | Description | Example | +|:-------|:--------|:---------|:------------|:--------| +| api_url | 'https://dns.hetzner.com/api/v1' | false | Hetzner DNS API URL | | +| api_token | | true | default TTL to set for created records - if not defined in individual FQDN settings | 'secretapitoken' | diff --git a/dyndnshelper/__init__.py b/dyndnshelper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dyndnshelper/exceptions.py b/dyndnshelper/exceptions.py new file mode 100644 index 0000000..49afb54 --- /dev/null +++ b/dyndnshelper/exceptions.py @@ -0,0 +1,6 @@ +class VendorError(Exception): + ... + + +class ZoneMissingError(Exception): + ... diff --git a/dyndnshelper/logging.py b/dyndnshelper/logging.py new file mode 100644 index 0000000..47f1371 --- /dev/null +++ b/dyndnshelper/logging.py @@ -0,0 +1,31 @@ +import logging + + +LOGGER = logging.getLogger(__package__) + + +def set_logging_config(app_log_level: int, root_log_level: int): + """ + Sets Application logging defaults + Differentiates between Application Log Level and root Log Level + """ + logging.basicConfig( + level=root_log_level, + format='%(asctime)s %(levelname)-8s %(message)s ', + datefmt='%Y-%m-%dT%H:%M:%SZ', + force=True + ) + + # configure individual logger for application + LOGGER.setLevel(app_log_level) + LOGGER.propagate = False + + console_handler = logging.StreamHandler() + console_handler.setLevel(app_log_level) + console_handler.setFormatter( + logging.Formatter( + fmt='%(asctime)s %(levelname)-8s %(message)s ', + datefmt='%Y-%m-%dT%H:%M:%SZ' + ) + ) + LOGGER.addHandler(console_handler) diff --git a/dyndnshelper/main.py b/dyndnshelper/main.py new file mode 100644 index 0000000..dc54344 --- /dev/null +++ b/dyndnshelper/main.py @@ -0,0 +1,26 @@ +import uvicorn + +from fastapi import FastAPI +from importlib.metadata import metadata +from .routers import fqdn_router +from .settings import CONFIG + +pkg_metadata = metadata(__package__) + +app = FastAPI( + title=pkg_metadata.get('name'), + summary=pkg_metadata.get('summary'), + description=pkg_metadata.get('description'), + version=pkg_metadata.get('version'), +) +app.include_router(fqdn_router) + + +def run(): + config = uvicorn.Config( + app=app, + port=CONFIG.server.port, + log_level=CONFIG.server.app_log_level + ) + server = uvicorn.Server(config) + server.run() diff --git a/dyndnshelper/routers/__init__.py b/dyndnshelper/routers/__init__.py new file mode 100644 index 0000000..498bea5 --- /dev/null +++ b/dyndnshelper/routers/__init__.py @@ -0,0 +1,5 @@ +from .fqdns import fqdn_router + +__all__ = [ + 'fqdn_router' +] diff --git a/dyndnshelper/routers/fqdns.py b/dyndnshelper/routers/fqdns.py new file mode 100644 index 0000000..379a6fe --- /dev/null +++ b/dyndnshelper/routers/fqdns.py @@ -0,0 +1,112 @@ +from fastapi import APIRouter, Query, Depends, HTTPException, status +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from ipaddress import IPv4Address, IPv6Address, IPv6Network +from hashlib import sha256 +from netaddr import IPAddress, IPNetwork +from typing import Annotated +from ..logging import LOGGER +from ..settings import CONFIG, EmptyStr, FQDNSettings + + +fqdn_router = APIRouter( + prefix='/fqdns', + tags=['FQDN'] +) + + +security = HTTPBasic() + + +def get_fqdn_settings(fqdn: str) -> FQDNSettings: + fqdn_settings = CONFIG.fqdns.get(fqdn) + if not fqdn_settings: + # fqdn is not configured + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="FQDN not found", + headers={"WWW-Authenticate": "Basic"} + ) + return fqdn_settings + + +def check_credentials_for_fqdn( + fqdn_settings: Annotated[FQDNSettings, Depends(get_fqdn_settings)], + credentials: Annotated[HTTPBasicCredentials, Depends(security)] +) -> str: + auth_error = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Basic"} + ) + + expected_password_sha256 = fqdn_settings.dyndns_credentials.get(credentials.username) + if not expected_password_sha256: + # user is not configured + raise auth_error + + password_sha256 = sha256(credentials.password.encode("utf8")).hexdigest() + if expected_password_sha256.get_secret_value() != password_sha256: + # incorrect password + raise auth_error + + return credentials.username + + +@fqdn_router.get('/{fqdn}/dyndns') +async def dyndns( + username: Annotated[str, Depends(check_credentials_for_fqdn)], + fqdn_settings: Annotated[FQDNSettings, Depends(get_fqdn_settings)], + ip: Annotated[list[IPv4Address | IPv6Address | EmptyStr], Query()] = None, + ipv6lanprefix: Annotated[IPv6Network, Query()] = None, + dualstack: Annotated[bool, Query()] = None +): + if ip: + # remove empty string items + # as senders may work with simple str replace on update urls - there may be empty strings + ip = [i for i in ip if i] + + if ipv6lanprefix and ipv6lanprefix.version != 6: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="ipv6lanprefix is not valid IPv6" + ) + + if ip: + LOGGER.debug(f'DynDNS FQDN {fqdn_settings.fqdn} with IPs {", ".join(map(str, ip))}') + vendor = CONFIG.vendors.get_vendor(fqdn_settings.vendor) + await vendor.ensure_fqdn( + fqdn=fqdn_settings.fqdn, + zone=fqdn_settings.zone, + ip_addresses=ip, + ttl=fqdn_settings.record_ttl + ) + + if ipv6lanprefix and fqdn_settings.ipv6_lan_prefix_iid_map: + # we'll do a little stunt via using netaddr as it already provides bitwise operations + # and we have to bitwise OR network address with iid address + netaddr_ipv6lanprefix_address = IPNetwork(str(ipv6lanprefix)).network + for iid_fqdn, iid in fqdn_settings.ipv6_lan_prefix_iid_map.items(): + LOGGER.debug(f'DynDNS IID FQDN {iid_fqdn} {iid} with IPv6 LAN Prefix {ipv6lanprefix}') + netaddr_iid_address = IPAddress(str(iid)) + combined_address = IPv6Address(str(netaddr_ipv6lanprefix_address | netaddr_iid_address)) + + iid_fqdn_settings = get_fqdn_settings(iid_fqdn) + vendor = CONFIG.vendors.get_vendor(iid_fqdn_settings.vendor) + await vendor.ensure_fqdn( + fqdn=iid_fqdn_settings.fqdn, + zone=iid_fqdn_settings.zone, + ip_addresses=[combined_address], + ttl=iid_fqdn_settings.record_ttl + ) + + # TODO + # get vendor + # ensure zone, if allowed, otherwise fail + # ensure records for all IPs (A, AAAA) + # if ipv6lanprefix is provided: + # for each fqdn with iid + # get fqdn_settings + # get vendor + # ensure zone, if allowed, otherwise fail + # ensure AAAA record for iid + return 'OK' diff --git a/dyndnshelper/settings.py b/dyndnshelper/settings.py new file mode 100644 index 0000000..a65a195 --- /dev/null +++ b/dyndnshelper/settings.py @@ -0,0 +1,192 @@ +import json +import logging +import os + +from ipaddress import IPv4Address, IPv6Address +from pathlib import Path +from pydantic import BaseModel, ConfigDict, FieldValidationInfo, SecretStr, Field, field_validator +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource +from typing import Annotated, Type +from .logging import set_logging_config +from .vendors import Vendor, VendorNames, VendorByName, HetznerSettings + + +FqdnStr = Annotated[str, Field(pattern=r'^(([a-z0-9][a-z0-9\-]+[a-z0-9])|[a-z0-9]+\.)+([a-z]+|xn\-\-[a-z0-9]+)\.?$', strict=True)] +EmptyStr = Annotated[str, Field(max_length=0)] + + +class FQDNSettings(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + fqdn: FqdnStr = None + """ + Is set automatically in validation + """ + vendor: VendorNames + zone: FqdnStr = None + """ + Zone to add record to - defaults to root zone + """ + record_ttl: int | None = None + """ + Overrides default vendor ttl, if set + """ + dyndns_credentials: dict[str, SecretStr] = Field(default_factory=dict) + """ + A FQDN can have multiple credentials that are allowed to update the records + + Each credential is a mapping of an unique username & a SHA256 password hash + + Example: + { + "user1": "01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b" + } + """ + ipv6_lan_prefix_iid_map: dict[FqdnStr, IPv4Address | IPv6Address] = Field(default_factory=dict) + """ + In addition to the regular logic for updating a FQDN, an IPv6 LAN Prefix can be used to update other FQDNs, even across vendors + + Example: + { + "host1.test.domain.tld": "::1", + "serviceX.anotherdomain.tld": "::22" + } + Where "host1..." is of vendor X and "serviceX..." is of vendor Y + + With a new prefix of "2001:db8:85a3::/48" the following AAAA records will be set: + Vendor X host1.test.domain.tld 2001:db8:85a3::1 + Vendor Y serviceX.anotherdomain.tld 2001:db8:85a3::22 + + NOTE: If prefix is smaller than IID requires, parts of IID will be omitted. E.g.: + Prefix 2001:db8:85a3::/48 and IID ::1111:2222:3333:4444:5555:6666 results in 2001:db8:85a3:2222:3333:4444:5555:6666 + """ + @field_validator('dyndns_credentials') + def validate_dyndns_credentials(cls, v, info: FieldValidationInfo): + if isinstance(v, dict): + for username, password_sha256 in v.items(): + if isinstance(password_sha256, SecretStr) and len(password_sha256.get_secret_value()) == 64: + try: + int(password_sha256.get_secret_value(), 16) + continue + except ValueError: + pass + raise ValueError(f'`{info.field_name}` {username} password value is not valid SHA256') + return v + + def ensure_fallback_root_zone(self): + """ + ensures at least root zone is set, for example: + + test.domain.tld -> domain.tld + host.test.domain.tld -> domain.tld + ... + + raises ValueError if no domain is extractable - e.g. if an fqdn is invalid + """ + if self.zone: + return + parts = self.fqdn.rsplit('.', maxsplit=2) + domain_tld_parts = parts[-2:] + if len(domain_tld_parts) == 1: + raise ValueError(f'Unable to extract domain with tld from FQDN {self.fqdn}') + self.zone = '.'.join(domain_tld_parts) + + +class Server(BaseModel): + port: int = 8000 + app_log_level: int = logging.INFO + root_log_level: int = logging.WARNING + + @field_validator('app_log_level', 'root_log_level', mode='before') + def validate_log_level(cls, v, info: FieldValidationInfo): + log_level_map = logging.getLevelNamesMapping() + if isinstance(v, str): + log_level = log_level_map.get(v.upper()) + if log_level is not None: + return log_level + elif isinstance(v, int) and v in log_level_map.values(): + return v + + raise ValueError(f'`{info.field_name}` {v} is not a valid log level') + + +class Vendors(BaseModel): + hetzner: HetznerSettings = Field(default_factory=HetznerSettings) + + def get_vendor(self, name: VendorNames) -> Vendor: + vendor_settings = getattr(self, name, None) + if not vendor_settings: + raise ValueError(f'No vendor settings found for name `{name}`') + vendor_cls = VendorByName.get(name) + if not vendor_cls: + raise ValueError(f'No vendor found for name `{name}`') + return vendor_cls(settings=vendor_settings) + + +class JsonConfigSettingsSource(PydanticBaseSettingsSource): + def __call__(self) -> dict: + config = dict() + for config_file in self.settings_cls.config_files: + try: + data = json.loads(config_file.read_text('utf-8')) + if isinstance(data, dict): + config.update(data) + except Exception as e: + logging.warning(f'Unable to read config file {config_file}: {e}') + return config + + def get_field_value(self, field, field_name): + # abstractmethod but seems to be not used... yet? + pass + + +class Settings(BaseSettings): + fqdns: dict[FqdnStr, FQDNSettings] = Field(default_factory=dict) + server: Server = Field(default_factory=Server) + vendors: Vendors = Field(default_factory=Vendors) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ): + return ( + init_settings, + JsonConfigSettingsSource(settings_cls) + ) + + @classmethod + @property + def config_files(cls) -> list[str]: + config_files = list() + config_root = Path(f'/etc/{__package__}') + if config_root.exists(): + # read all json files recursively + for root, dirs, files in os.walk(config_root): + for filename in files: + if filename.endswith('.json'): + config_files.append( + config_root.joinpath(filename) + ) + return config_files + + @field_validator('fqdns', mode='after') + def validate_fqdns(cls, v, info: FieldValidationInfo): + if not isinstance(v, dict): + raise ValueError(f'{info.field_name} should be a valid dict but is: {type(v)}') + + for fqdn, fqdn_settings in v.items(): + fqdn_settings.fqdn = fqdn + fqdn_settings.ensure_fallback_root_zone() + return v + + +CONFIG = Settings() +set_logging_config( + app_log_level=CONFIG.server.app_log_level, + root_log_level=CONFIG.server.root_log_level +) diff --git a/dyndnshelper/vendors/__init__.py b/dyndnshelper/vendors/__init__.py new file mode 100644 index 0000000..87d7a35 --- /dev/null +++ b/dyndnshelper/vendors/__init__.py @@ -0,0 +1,21 @@ +from enum import StrEnum +from ._base import VendorSettings, Vendor +from .hetzner import HetznerSettings, Hetzner + + +class VendorNames(StrEnum): + HETZNER = HetznerSettings.vendor_name + + +VendorByName: dict[VendorNames, VendorSettings] = { + VendorNames.HETZNER: Hetzner +} + + +__all__ = [ + 'HetznerSettings', + 'Vendor', + 'VendorNames', + 'VendorByName', + 'VendorSettings' +] diff --git a/dyndnshelper/vendors/_base.py b/dyndnshelper/vendors/_base.py new file mode 100644 index 0000000..cb721af --- /dev/null +++ b/dyndnshelper/vendors/_base.py @@ -0,0 +1,195 @@ +import httpx + +from abc import ABC, abstractmethod +from ipaddress import IPv4Address, IPv6Address +from pydantic import BaseModel, ConfigDict, Field, model_validator +from typing import ClassVar, Union, get_origin, get_args +from ..logging import LOGGER +from ..exceptions import VendorError + + +class VendorSettings(BaseModel): + vendor_name: ClassVar[str] = None + enabled: bool = Field(False, description="if this vendor is enabled and its settings are required") + create_zone_if_missing: bool = False + default_zone_ttl: int = 86400 + """ + used for creating zones - is not applied to existing zones + """ + default_record_ttl: int = 600 + """ + used for creating records - is not applied to existing records + + NOTE: may be overridden by individual FQDN settings + """ + + @model_validator(mode='after') + def validate_required_fields_if_enabled(cls, data): + """ + All fields not allowing `None` via type annotation will be required, if `enabled` is `True` + + This validator enforces it after individual field validation + """ + if not data.enabled: + # disabled - no need to validate + return data + + for name, info in cls.model_fields.items(): + if getattr(data, name, None) is not None: + # value is set + continue + if get_origin(info.annotation) is Union: + # maybe None is allowed? + if type(None) in get_args(info.annotation): + # None is allowed + continue + raise ValueError(f'{cls.vendor_name} setting `{name}` is required') + return data + + +class Vendor(BaseModel, ABC): + model_config = ConfigDict(arbitrary_types_allowed=True) + + settings: VendorSettings | None = None + httpx_client: httpx.AsyncClient | None = None + httpx_auth: httpx.Auth | None = None + """ + Can be used to supply authentication data such as auth headers + + See https://www.python-httpx.org/advanced/#customizing-authentication + """ + + @abstractmethod + async def ensure_fqdn(self, fqdn: str, zone: str, ip_addresses: list[IPv4Address | IPv6Address], ttl: int = None): + raise NotImplementedError() + + def create_httpx_client(self): + """ + Create a new httpx.AsyncClient to be used via e.g. `async with self.httpx_client:` + """ + self.httpx_client = httpx.AsyncClient( + auth=self.httpx_auth + ) + + @classmethod + def get_record_type_for_ip(cls, ip_address: IPv4Address | IPv6Address): + if ip_address.version == 4: + return 'A' + elif ip_address.version == 6: + return 'AAAA' + else: + raise NotImplementedError(f'Unsupported IP version: {ip_address.version}') + + @property + def logger(self): + return LOGGER + + async def _api_request(self, coroutine, raise_for_status: bool = True) -> httpx.Response: + response = await coroutine + if raise_for_status and response.status_code >= 400: + raise VendorError(f'Received HTTP {response.status_code} from Vendor API: {response.text}') + return response + + async def api_get( + self, + url: str, + query_parameters: dict = None, + headers: dict = None, + timeout: int = 30, + raise_for_status: bool = True + ) -> httpx.Response: + return await self._api_request( + coroutine=self.httpx_client.get( + auth=self.httpx_auth, + url=url, + params=query_parameters, + headers=headers, + timeout=timeout + ), + raise_for_status=raise_for_status + ) + + async def api_post( + self, + url: str, + query_parameters: dict = None, + headers: dict = None, + data=None, + json: dict = None, + timeout: int = 30, + raise_for_status: bool = True + ) -> httpx.Response: + return await self._api_request( + coroutine=self.httpx_client.post( + url=url, + params=query_parameters, + headers=headers, + data=data, + json=json, + timeout=timeout + ), + raise_for_status=raise_for_status + ) + + async def api_put( + self, + url: str, + query_parameters: dict = None, + headers: dict = None, + data=None, + json: dict = None, + timeout: int = 30, + raise_for_status: bool = True + ) -> httpx.Response: + return await self._api_request( + coroutine=self.httpx_client.put( + url=url, + params=query_parameters, + headers=headers, + data=data, + json=json, + timeout=timeout + ), + raise_for_status=raise_for_status + ) + + async def api_patch( + self, + url: str, + query_parameters: dict = None, + headers: dict = None, + data=None, + json: dict = None, + timeout: int = 30, + raise_for_status: bool = True + ) -> httpx.Response: + return await self._api_request( + coroutine=self.httpx_client.patch( + url=url, + params=query_parameters, + headers=headers, + data=data, + json=json, + timeout=timeout + ), + raise_for_status=raise_for_status + ) + + async def api_delete( + self, + url: str, + query_parameters: dict = None, + headers: dict = None, + timeout: int = 30, + raise_for_status: bool = True + ) -> httpx.Response: + return await self._api_request( + coroutine=self.httpx_client.delete( + auth=self.httpx_auth, + url=url, + params=query_parameters, + headers=headers, + timeout=timeout + ), + raise_for_status=raise_for_status + ) diff --git a/dyndnshelper/vendors/hetzner.py b/dyndnshelper/vendors/hetzner.py new file mode 100644 index 0000000..91bffcd --- /dev/null +++ b/dyndnshelper/vendors/hetzner.py @@ -0,0 +1,186 @@ +from httpx import Auth +from ipaddress import IPv4Address, IPv6Address +from pydantic import BaseModel, ConfigDict, SecretStr, Field, model_validator +from ._base import VendorSettings, Vendor +from ..exceptions import VendorError, ZoneMissingError + + +class HetznerSettings(VendorSettings): + vendor_name = 'hetzner' + api_url: str = Field('https://dns.hetzner.com/api/v1') + api_token: SecretStr | None = None + + +class HetznerAuth(Auth): + def __init__(self, api_token: str): + self.api_token = api_token + + def auth_flow(self, request): + request.headers['Auth-API-Token'] = self.api_token + yield request + + +class HetznerZone(BaseModel): + model_config = ConfigDict(extra='allow') + + id: str + name: str + ttl: int + + @property + def update_dict(self): + return self.model_dump( + mode='json', + include=('name', 'ttl') + ) + + +class HetznerRecord(BaseModel): + model_config = ConfigDict(extra='allow') + + id: str + zone_id: str + type: str + name: str + value: str + ttl: int + + @property + def update_dict(self): + return self.model_dump( + mode='json', + include=('zone_id', 'type', 'name', 'value', 'ttl') + ) + + +class Hetzner(Vendor): + settings: HetznerSettings + + @model_validator(mode='after') + def setup_httpx_auth(cls, data: Vendor): + if data.httpx_auth is None: + data.httpx_auth = HetznerAuth( + api_token=data.settings.api_token.get_secret_value() + ) + return data + + async def get_zone(self, name: str) -> HetznerZone | None: + self.logger.debug(f'Fetching zone {name}') + response = await self.api_get( + url=f'{self.settings.api_url}/zones', + query_parameters={ + 'name': name + } + ) + zones = response.json().get('zones') + if not isinstance(zones, list): + raise VendorError(f'Hetzner API did not return zones, but: {type(zones)} {zones}') + elif len(zones) == 1: + return HetznerZone(**zones[0]) + + async def create_zone(self, name: str, ttl: int = None) -> HetznerZone: + self.logger.info(f'Creating zone {name}') + response = await self.api_post( + url=f'{self.settings.api_url}/zones', + json={ + 'name': name, + 'ttl': ttl or self.settings.default_zone_ttl + } + ) + zone = response.json().get('zone') + if not isinstance(zone, dict): + raise VendorError(f'Hetzner API did not return zone on creation, but: {type(zone)} {zone}') + + return HetznerZone(**zone) + + async def get_records(self, zone_id: str, name: str = None) -> list[HetznerRecord]: + """ + get records of a specific zone + + zone_id -- id of the zone as per API + + name -- record name to filter for (local filtering, API does not provide a search function) + """ + self.logger.debug(f'Fetching records of zone {zone_id}') + response = await self.api_get( + url=f'{self.settings.api_url}/records', + query_parameters={ + 'zone_id': zone_id + } + ) + records = response.json().get('records') + if not isinstance(records, list): + raise VendorError(f'Hetzner API did not return records, but: {type(records)} {records}') + + parsed_records: list[HetznerRecord] = list() + for record in records: + if name and not record.get('name') == name: + continue + parsed_records.append(HetznerRecord(**record)) + + return parsed_records + + async def create_record(self, zone_id: str, name: str, record_type: str, value: str, ttl: int = None) -> HetznerRecord: + self.logger.info(f'Creating {record_type} record {name} in zone {zone_id}: {value}') + response = await self.api_post( + url=f'{self.settings.api_url}/records', + json={ + 'zone_id': zone_id, + 'name': name, + 'type': record_type, + 'ttl': ttl or self.settings.default_record_ttl, + 'value': value + } + ) + record = response.json().get('record') + if not isinstance(record, dict): + raise VendorError(f'Hetzner API did not return record on creation, but: {type(record)} {record}') + + return HetznerRecord(**record) + + async def update_record(self, record: HetznerRecord) -> HetznerRecord: + self.logger.info(f'Updating {record.id} / {record.type} record {record.name} in zone {record.zone_id}: {record.value}') + response = await self.api_put( + url=f'{self.settings.api_url}/records/{record.id}', + json=record.update_dict + ) + record = response.json().get('record') + if not isinstance(record, dict): + raise VendorError(f'Hetzner API did not return record on creation, but: {type(record)} {record}') + + return HetznerRecord(**record) + + async def ensure_fqdn(self, fqdn: str, zone: str, ip_addresses: list[IPv4Address | IPv6Address], ttl: int = None): + self.create_httpx_client() + async with self.httpx_client: + hetzner_zone = await self.get_zone(name=zone) + if not hetzner_zone: + if not self.settings.create_zone_if_missing: + raise ZoneMissingError(f'Hetzner Zone {zone} is missing') + hetzner_zone = await self.create_zone(name=zone) + + fqdn_record_name = fqdn.replace(f'.{hetzner_zone.name}', '') + hetzner_records = await self.get_records( + zone_id=hetzner_zone.id, + name=fqdn_record_name + ) + for ip_address in ip_addresses: + record_type = self.get_record_type_for_ip(ip_address) + matching_records = list(filter(lambda r: r.type == record_type, hetzner_records)) + if len(matching_records) == 0: + await self.create_record( + zone_id=hetzner_zone.id, + name=fqdn_record_name, + record_type=record_type, + value=str(ip_address.compressed), + ttl=ttl + ) + elif len(matching_records) == 1: + if matching_records[0].value == str(ip_address.compressed): + continue + matching_records[0].value = str(ip_address) + await self.update_record( + record=matching_records[0] + ) + else: + raise NotImplementedError('Unable to handle multiple records with the same name, such as round robin') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4c81dda --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,32 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +packages = [ + "dyndnshelper", + "dyndnshelper.routers", + "dyndnshelper.vendors" +] + +[tool.setuptools-scm] + +[project] +name = "dyndnshelper" +description = "Self-hosted dyndns service with support for different vendors" +readme = "README.md" +requires-python = ">=3.10" +keywords = ["dyndns", "dns"] +license = {text = "BSD 3-Clause License"} +dependencies = [ + "httpx", + "fastapi>=0.100.0", + "netaddr", + "pydantic>=2.0.0", + "pydantic-settings", + "uvicorn[standard]" +] +dynamic = ["version"] + +[project.scripts] +dyndnshelper = "dyndnshelper.main:run"