2023-07-09 18:52:12 +02:00

113 lines
4.0 KiB
Python

from fastapi import APIRouter, Query, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from ipaddress import IPv4Address, IPv6Address, IPv6Network
from hashlib import sha256
from netaddr import IPAddress, IPNetwork
from typing import Annotated
from ..logging import LOGGER
from ..settings import CONFIG, EmptyStr, FQDNSettings
fqdn_router = APIRouter(
prefix='/fqdns',
tags=['FQDN']
)
security = HTTPBasic()
def get_fqdn_settings(fqdn: str) -> FQDNSettings:
fqdn_settings = CONFIG.fqdns.get(fqdn)
if not fqdn_settings:
# fqdn is not configured
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="FQDN not found",
headers={"WWW-Authenticate": "Basic"}
)
return fqdn_settings
def check_credentials_for_fqdn(
fqdn_settings: Annotated[FQDNSettings, Depends(get_fqdn_settings)],
credentials: Annotated[HTTPBasicCredentials, Depends(security)]
) -> str:
auth_error = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"}
)
expected_password_sha256 = fqdn_settings.dyndns_credentials.get(credentials.username)
if not expected_password_sha256:
# user is not configured
raise auth_error
password_sha256 = sha256(credentials.password.encode("utf8")).hexdigest()
if expected_password_sha256.get_secret_value() != password_sha256:
# incorrect password
raise auth_error
return credentials.username
@fqdn_router.get('/{fqdn}/dyndns')
async def dyndns(
username: Annotated[str, Depends(check_credentials_for_fqdn)],
fqdn_settings: Annotated[FQDNSettings, Depends(get_fqdn_settings)],
ip: Annotated[list[IPv4Address | IPv6Address | EmptyStr], Query()] = None,
ipv6lanprefix: Annotated[IPv6Network, Query()] = None,
dualstack: Annotated[bool, Query()] = None
):
if ip:
# remove empty string items
# as senders may work with simple str replace on update urls - there may be empty strings
ip = [i for i in ip if i]
if ipv6lanprefix and ipv6lanprefix.version != 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="ipv6lanprefix is not valid IPv6"
)
if ip:
LOGGER.debug(f'DynDNS FQDN {fqdn_settings.fqdn} with IPs {", ".join(map(str, ip))}')
vendor = CONFIG.vendors.get_vendor(fqdn_settings.vendor)
await vendor.ensure_fqdn(
fqdn=fqdn_settings.fqdn,
zone=fqdn_settings.zone,
ip_addresses=ip,
ttl=fqdn_settings.record_ttl
)
if ipv6lanprefix and fqdn_settings.ipv6_lan_prefix_iid_map:
# we'll do a little stunt via using netaddr as it already provides bitwise operations
# and we have to bitwise OR network address with iid address
netaddr_ipv6lanprefix_address = IPNetwork(str(ipv6lanprefix)).network
for iid_fqdn, iid in fqdn_settings.ipv6_lan_prefix_iid_map.items():
LOGGER.debug(f'DynDNS IID FQDN {iid_fqdn} {iid} with IPv6 LAN Prefix {ipv6lanprefix}')
netaddr_iid_address = IPAddress(str(iid))
combined_address = IPv6Address(str(netaddr_ipv6lanprefix_address | netaddr_iid_address))
iid_fqdn_settings = get_fqdn_settings(iid_fqdn)
vendor = CONFIG.vendors.get_vendor(iid_fqdn_settings.vendor)
await vendor.ensure_fqdn(
fqdn=iid_fqdn_settings.fqdn,
zone=iid_fqdn_settings.zone,
ip_addresses=[combined_address],
ttl=iid_fqdn_settings.record_ttl
)
# TODO
# get vendor
# ensure zone, if allowed, otherwise fail
# ensure records for all IPs (A, AAAA)
# if ipv6lanprefix is provided:
# for each fqdn with iid
# get fqdn_settings
# get vendor
# ensure zone, if allowed, otherwise fail
# ensure AAAA record for iid
return 'OK'