Simplify cmdline parameter passing to TlsaUpdater

This commit is contained in:
László Károlyi 2024-04-08 15:55:13 +02:00
parent 5b36f355da
commit 2a284f7d80
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
6 changed files with 27 additions and 28 deletions

View File

@ -75,8 +75,8 @@ class AdoptHandler(HandlerBase):
self._config.runtime.adopted[conf_file] = adopted
if not self._args.skip_tlsa_update:
tlsa_updater = TlsaUpdater(
cmdline_args=self._args, config=self._config, cert=cert,
certconfig_path=conf_file)
verbose=self._args.verbose, dry_run=self._args.dry_run,
config=self._config, cert=cert, certconfig_path=conf_file)
tlsa_updater.process()
self._config.runtime.save(path=self._config.paths.runtimeyaml)
notify(

View File

@ -223,8 +223,8 @@ class RenewHandler(HandlerBase):
config = deepcopy(x=self._certbot_config)
config.certname = cert.lineagename
tlsa_updater = TlsaUpdater(
cmdline_args=self._args, config=self._config, cert=cert,
certconfig_path=certconfig_path)
verbose=self._args.verbose, dry_run=self._args.dry_run,
config=self._config, cert=cert, certconfig_path=certconfig_path)
if not should_renew(config=config, lineage=cert):
tlsa_updater.process()
return []

View File

@ -41,7 +41,7 @@ class UpdateTlsaHandler(HandlerBase):
adopted_certs = self._get_adopted_certs()
for conf_file, cert in adopted_certs.items():
updater = TlsaUpdater(
cmdline_args=self._args, config=self._config, cert=cert,
certconfig_path=conf_file)
verbose=self._args.verbose, dry_run=self._args.verbose,
config=self._config, cert=cert, certconfig_path=conf_file)
updater.process()
return 0

View File

@ -53,12 +53,14 @@ def _setup_subparser_adopt(subparsers: _SPA):
name='adopt',
help='Adopt a certificate deployed by certbot',
description='Adopt a certificate deployed by certbot')
adopt_parser.add_argument(
'-v', '--verbose', action='store_true',
help='Be more verbose about the adoption process')
adopt_parser.add_argument(
'-s', '--skip-tlsa-update', action='store_true',
help='Skip setting up TLSA record for the adopted certificate.')
adopt_parser.add_argument(
'-v', '--verbose', action='store_true',
help='Be more verbose about the adoption process')
'--dry-run', action='store_true', help='Emulate TLSA update.')
adopt_parser.add_argument(
action='extend', nargs='+',
help='One or more certificate name (hostname)', metavar='hostname',

View File

@ -118,8 +118,8 @@ class _DnsServerInfo(object):
def _log_results(
self, fqdn: _FqdnHostname, rdset_added: Rdataset,
rdset_removed: Rdataset, cmdline_args: Namespace) -> None:
if not cmdline_args.verbose:
rdset_removed: Rdataset, verbose: bool) -> None:
if not verbose:
notify(msg=(
f'Nameserver {self.str_ip}, {fqdn}: {len(rdset_added.items)} '
f'record(s) added, {len(rdset_removed.items)} record(s) '
@ -137,8 +137,7 @@ class _DnsServerInfo(object):
def _get_zoneupdate(
self, fqdn: _FqdnHostname, records_existing: set[TlsaRecordType],
records_required: set[TlsaRecordType], ttl: int,
cmdline_args: Namespace
records_required: set[TlsaRecordType], ttl: int, verbose: bool
) -> UpdateMessage:
"""
Return an `UpdateMessage` for updating this FQDN. The assumption
@ -163,17 +162,17 @@ class _DnsServerInfo(object):
update.add(fqdn.as_dnsname, rdset_added)
self._log_results(
fqdn=fqdn, rdset_added=rdset_added, rdset_removed=rdset_removed,
cmdline_args=cmdline_args)
verbose=verbose)
return update
def _are_records_same(
self, fqdn: _FqdnHostname, records_existing: set[TlsaRecordType],
records_required: set[TlsaRecordType], cmdline_args: Namespace
records_required: set[TlsaRecordType], verbose: bool
) -> bool:
'Return trueness of similarity while logging.'
if records_existing != records_required:
return False
if cmdline_args.verbose:
if verbose:
notify(msg=f'--- {fqdn}: Records are the same, skipping update.')
return True
notify(msg=(
@ -182,24 +181,23 @@ class _DnsServerInfo(object):
return True
def update(
self, cmdline_args: Namespace, hostname_records: _TlsaRecordsDict,
self, hostname_records: _TlsaRecordsDict, verbose: bool, dry_run: bool,
ttl: int
) -> None:
'Compare necessary and existing records, update when necessary.'
if cmdline_args.verbose:
if verbose:
notify(msg=f'\n---- Nameserver {self.ip}')
for fqdn, records_required in hostname_records.items():
records_existing = self._resolve(fqdn=fqdn)
is_same = self._are_records_same(
fqdn=fqdn, records_existing=records_existing,
records_required=records_required, cmdline_args=cmdline_args)
records_required=records_required, verbose=verbose)
if is_same:
continue
update = self._get_zoneupdate(
fqdn=fqdn, records_existing=records_existing,
records_required=records_required, ttl=ttl,
cmdline_args=cmdline_args)
if cmdline_args.dry_run:
records_required=records_required, ttl=ttl, verbose=verbose)
if dry_run:
continue
response = tcp(q=update, where=self.str_ip)
if not response.errors:

View File

@ -1,5 +1,3 @@
from argparse import Namespace
from daneupdate.utils.cert import DaneCert
from ..config import Configuration
@ -10,10 +8,11 @@ class TlsaUpdater(object):
'Updating TLSA records for an deployed and/or upcoming certificate.'
def __init__(
self, cmdline_args: Namespace, config: Configuration, cert: DaneCert,
certconfig_path: str
self, verbose: bool, dry_run: bool, config: Configuration,
cert: DaneCert, certconfig_path: str
):
self._cmdline_args = cmdline_args
self._verbose = verbose
self._dry_run = dry_run
self._adopted_config = config.runtime.adopted[certconfig_path]
self._config = config
self._cert = cert
@ -43,9 +42,9 @@ class TlsaUpdater(object):
hostname_config.get_tlsa_records(cert=self._cert)
for server in hostname_config.servers:
server.update(
cmdline_args=self._cmdline_args,
hostname_records=hostname_records,
ttl=hostname_config.ttl)
ttl=hostname_config.ttl, verbose=self._verbose,
dry_run=self._dry_run)
return
raise NotImplementedError(
f'DNS update method {self._config.defaults.method!r} not ' +