Trying to fix renewals

This commit is contained in:
László Károlyi 2024-04-09 18:16:15 +02:00
parent 35ee67b0ae
commit ccca6d845e
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
5 changed files with 44 additions and 51 deletions

View File

@ -1,12 +1,9 @@
from copy import deepcopy
from logging import getLogger
from typing import Final
from certbot._internal.cert_manager import _describe_certs
from certbot._internal.renewal import reconstitute
from certbot._internal.storage import renewal_file_for_certname
from certbot.display.util import notify
from daneupdate.utils.cert import DaneCert
from ..utils.config import AdoptedCertConfig
from ..utils.tlsa.updater import TlsaUpdater
@ -20,13 +17,12 @@ class ListAdoptablesHandler(HandlerBase):
def _process(self) -> int:
'Show the adoptable certificate paths.'
result = self._get_parsed_certs()
adoptable_certs = [
result.parsed_certs[x] for x in result.parsed_certs
if x not in result.already_adopted]
self.all_certs.parsed[x] for x in self.all_certs.parsed
if x not in self.all_certs.already_adopted]
_describe_certs(
config=self._certbot_config, parsed_certs=adoptable_certs,
parse_failures=result.parse_failures)
parse_failures=self.all_certs.parse_failures)
return 0
@ -48,27 +44,20 @@ class AdoptHandler(HandlerBase):
configs_to_adopt = set(
renewal_file_for_certname(config=self._certbot_config, certname=x)
for x in self._args.hostnames)
already_adopted = \
self._get_adopted_configpaths(available=configs_to_adopt)
if already_adopted:
str_already_adopted = '\n'.join(f'- {x}' for x in already_adopted)
if self.all_certs.already_adopted:
str_already_adopted = \
'\n'.join(f'- {x}' for x in self.all_certs.already_adopted)
notify(
msg='The following configurations have already been ' +
f'adopted, not adopting:\n{str_already_adopted}')
return [x for x in configs_to_adopt if x not in already_adopted]
return [
x for x in configs_to_adopt
if x not in self.all_certs.already_adopted]
def _process(self) -> int:
'Internal processing, after certbot setup.'
for conf_file in self._get_adoptable_configs():
item_config = deepcopy(x=self._certbot_config)
renewablecert = reconstitute(
config=item_config, full_path=conf_file)
if not renewablecert:
notify(
msg=f'Error adopting {conf_file}, please check the logs ' +
f'in {self._certbot_config.logs_dir}')
continue
cert = DaneCert.wrap_renewablecert(cert=renewablecert)
cert = self.all_certs.parsed[conf_file]
notify(msg=f'Adopting {conf_file}')
# Add with defaults
adopted = AdoptedCertConfig.get_empty(configpath=conf_file)

View File

@ -2,6 +2,7 @@ from __future__ import annotations
from abc import ABCMeta, abstractmethod
from argparse import Namespace
from functools import cached_property
from logging import getLogger
from traceback import format_exc
from types import MappingProxyType
@ -25,7 +26,7 @@ class _ParsedCertsResult(NamedTuple):
'Result of a `_get_parsed_certs()` call.'
# A `MappingProxyType` is basically a read-only `dict`
# config path -> DaneCert
parsed_certs: MappingProxyType[str, DaneCert]
parsed: MappingProxyType[str, DaneCert]
# lineage name -> config path
lineages: MappingProxyType[str, str]
parse_failures: tuple[str, ...]
@ -54,7 +55,8 @@ class HandlerBase(object, metaclass=ABCMeta):
'Return the already adopted paths in the passed list of paths.'
return frozenset(self._config.runtime.adopted).intersection(available)
def _get_parsed_certs(self) -> _ParsedCertsResult:
@cached_property
def all_certs(self) -> _ParsedCertsResult:
"""
Return a structured tuple of:
- list of certbot certificates from configs created by certbot
@ -84,7 +86,7 @@ class HandlerBase(object, metaclass=ABCMeta):
already_adopted = self._get_adopted_configpaths(
available=set(parsed_certs))
return _ParsedCertsResult(
parsed_certs=MappingProxyType(parsed_certs),
parsed=MappingProxyType(parsed_certs),
lineages=MappingProxyType(lineages),
parse_failures=parse_failures, already_adopted=already_adopted)

View File

@ -57,14 +57,14 @@ class _CertbotLogicEmulator(object):
def _installer(self) -> Optional[Installer]:
installer, self._authenticator = choose_configurator_plugins(
config=self._certbot_config, plugins=self._plugins,
verb='renew')
verb='certonly')
return installer
@cached_property
def _authenticator(self) -> Optional[Authenticator]:
self._installer, authenticator = choose_configurator_plugins(
config=self._certbot_config, plugins=self._plugins,
verb='renew')
verb='certonly')
return authenticator
@cached_property
@ -225,7 +225,7 @@ class RenewHandler(HandlerBase):
# In case time_to_reload < 0, we need to update TLSAs
tlsa_updater.process()
return domains
config = deepcopy(x=self._certbot_config)
config = cert.cli_config
config.certname = cert.lineagename
if not should_renew(config=config, lineage=cert):
tlsa_updater.process()
@ -239,18 +239,16 @@ class RenewHandler(HandlerBase):
def _renew_one_nonadopted(
self, cert: DaneCert) -> tuple[list[str], list[str]]:
'Renew a non-adopted certificate by running certbot\'s logic.'
config = deepcopy(x=self._certbot_config)
config.certname = cert.lineagename
return handle_renewal_request(config=config)
cert.cli_config.certname = cert.lineagename
return handle_renewal_request(config=cert.cli_config)
def _start_renewing(
self, to_renew: dict[str, DaneCert],
already_adopted: frozenset[str]) -> int:
self, to_renew: dict[str, DaneCert]) -> int:
'Start renewing the chosen certificates.'
renewed_domains = list[str]()
failed_domains = list[str]()
for certconfig_path, cert in to_renew.items():
if certconfig_path in already_adopted:
if certconfig_path in self.all_certs.already_adopted:
renewed_domains.extend(self._renew_one_adopted(
certconfig_path=certconfig_path, cert=cert))
continue
@ -263,11 +261,10 @@ class RenewHandler(HandlerBase):
def _process(self) -> int:
'Renew certificates one by one.'
result = self._get_parsed_certs()
if self._args.hostnames:
# Renew specific hostnames
hostname_set = set[str](self._args.hostnames)
nonexistent_lineages = hostname_set - set(result.lineages)
nonexistent_lineages = hostname_set - set(self.all_certs.lineages)
if nonexistent_lineages:
domains = summarize_domain_list(
domains=sorted(nonexistent_lineages))
@ -276,11 +273,11 @@ class RenewHandler(HandlerBase):
f'not known by certbot: {domains}')
return 1
to_renew = {
result.lineages[x]: result.parsed_certs[result.lineages[x]]
self.all_certs.lineages[x]:
self.all_certs.parsed[self.all_certs.lineages[x]]
for x in self._args.hostnames
}
else:
# Renew ALL the hostnames
to_renew = result.parsed_certs.copy()
return self._start_renewing(
to_renew=to_renew, already_adopted=result.already_adopted)
to_renew = self.all_certs.parsed.copy()
return self._start_renewing(to_renew=to_renew)

View File

@ -16,25 +16,24 @@ class UpdateTlsaHandler(HandlerBase):
def _get_adopted_certs(self) -> dict[str, DaneCert]:
'Return the `DaneCert`s to update TLSA records on.'
certparse_result = self._get_parsed_certs()
result = dict[str, DaneCert]()
if not self._args.hostnames:
for hostname, conf_file in certparse_result.lineages.items():
if conf_file not in certparse_result.already_adopted:
for hostname, conf_file in self.all_certs.lineages.items():
if conf_file not in self.all_certs.already_adopted:
continue
result[conf_file] = certparse_result.parsed_certs[conf_file]
result[conf_file] = self.all_certs.parsed[conf_file]
return result
for hostname in self._args.hostnames:
conf_file = certparse_result.lineages.get(hostname)
conf_file = self.all_certs.lineages.get(hostname)
if not conf_file:
raise KeyError(
f'Hostname (lineage) {hostname!r} is not unknown by ' +
'certbot.')
if conf_file not in certparse_result.already_adopted:
if conf_file not in self.all_certs.already_adopted:
raise KeyError(
f'Hostname (lineage) {hostname!r} is not yet adopted. ' +
'Please use the adopt command first.')
result[conf_file] = certparse_result.parsed_certs[conf_file]
result[conf_file] = self.all_certs.parsed[conf_file]
return result
def _process(self) -> int:

View File

@ -1,17 +1,20 @@
from __future__ import annotations
from copy import deepcopy
from datetime import datetime, timezone
from functools import cached_property, lru_cache
from hashlib import sha256, sha512
from logging import getLogger
from typing import Literal, Mapping, Optional
from certbot._internal.renewal import reconstitute
from certbot._internal.storage import (
ALL_FOUR, BASE_PRIVKEY_MODE, RenewableCert, config_with_defaults,
update_configuration)
from certbot.compat.filesystem import (
compute_private_key_mode, copy_ownership_and_apply_mode)
from certbot.configuration import NamespaceConfig
from certbot.crypto_util import verify_renewable_cert
from certbot.util import Key
from cryptography.hazmat.primitives._serialization import (
Encoding, PublicFormat)
@ -71,18 +74,21 @@ class DaneCert(RenewableCert):
@staticmethod
def wrap_renewablecert(cert: RenewableCert) -> DaneCert:
'Return a `DaneCert` from a `RenewableCert`.'
verify_renewable_cert(renewable_cert=cert)
result = DaneCert()
result.__dict__ = cert.__dict__
return result
@staticmethod
def from_configdata(
config_filename: str, cli_config: NamespaceConfig,
update_symlinks: bool = False) -> DaneCert:
config_filename: str, cli_config: NamespaceConfig) -> DaneCert:
'Return a `DaneCert` from configuration data.'
cert = RenewableCert(
config_filename=config_filename, cli_config=cli_config,
update_symlinks=update_symlinks)
cert = reconstitute(
config=deepcopy(x=cli_config), full_path=config_filename)
if not cert:
raise RuntimeError(
f'Error loading {config_filename}, please check the logs in ' +
f'{cli_config.logs_dir}')
return DaneCert.wrap_renewablecert(cert=cert)
@cached_property