daneupdate/src/daneupdate/utils/tlsa/updater.py

61 lines
2.2 KiB
Python

from daneupdate.utils.cert import DaneCert
from daneupdate.utils.tlsa.record_type import TlsaRecordType
from dns.name import from_text as name_from_text
from dns.rdatatype import TLSA
from dns.rdtypes.ANY.TLSA import TLSA as RdTypeTlsa
from dns.resolver import Resolver
from ..config import Configuration
from .liveupdate_config import HostnameConfig
def _get_records_from_ns(
fqdn: str, resolver: Resolver) -> set[TlsaRecordType]:
qname = name_from_text(text='_25._tcp.mail.weberdns.de.')
answer = resolver.resolve(qname=qname, rdtype=TLSA)
response_set = set[TlsaRecordType]()
item: RdTypeTlsa
for item in answer:
record = TlsaRecordType.from_rdtype(item=item)
response_set.add(record)
return response_set
class TlsaUpdater(object):
'Updating TLSA records for an deployed and/or upcoming certificate.'
def __init__(
self, config: Configuration, cert: DaneCert,
certconfig_path: str
):
self._adopted_config = config.runtime.adopted[certconfig_path]
self._config = config
self._cert = cert
def _get_hostname_configs(self) -> list[HostnameConfig]:
"""
Return the configured `AdoptedCertHostitemConfig`s or construct
one.
"""
if self._adopted_config.hosts:
return HostnameConfig.from_adoptedhosts(
adopted=self._adopted_config, config=self._config,
cert=self._cert)
return [
HostnameConfig.from_fqdn(fqdn=fqdn, config=self._config)
for fqdn in self._cert.names()]
def process(self):
if self._config.defaults.method == 'dns-update':
for hostname_config in self._get_hostname_configs():
tlsa_records = \
hostname_config.get_tlsa_records(cert=self._cert)
for fqdn, records_required in tlsa_records.items():
for server in hostname_config.servers:
records_existing = server.resolve(fqdn=fqdn)
if records_existing != records_required:
pass
# update = serverinfo.get_zoneupdate(
# zone=hostname_config.zone)
# update.replace