Add locks and pathlib

This commit is contained in:
László Károlyi 2023-09-21 20:36:31 +02:00
parent 40dab5fb46
commit 7ffed1f9b2
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
2 changed files with 193 additions and 0 deletions

66
src/ktools/locks.py Normal file
View File

@ -0,0 +1,66 @@
from fcntl import LOCK_EX, LOCK_UN, flock
from tempfile import gettempdir, gettempprefix
from .pathlib import Path
_KTOOLS_TEMPLOCK = Path(gettempdir(), f'{gettempprefix()}-ktools')
_KTOOLS_TEMPLOCK.touch(exist_ok=True)
class _KLock(object):
"""
A context-using file locker. HEADS UP: man 2 flock:
Locks are on files, not file descriptors. That is, file descriptors
duplicated through dup(2) or fork(2) do not result in multiple
instances of a lock, but rather multiple references to a single
lock. If a process holding a lock on a file forks and the child
explicitly unlocks the file, the parent will lose its lock.
Processes blocked awaiting a lock may be awakened by signals.
"""
path = _KTOOLS_TEMPLOCK
def __enter__(self):
'Enter the lock.'
self.file_descriptor = open(self.path, 'r')
flock(self.file_descriptor, LOCK_EX)
return self.file_descriptor
def __exit__(self, exc_type, exc_value, traceback):
'Exit the lock.'
flock(self.file_descriptor, LOCK_UN)
self.file_descriptor.close()
class TempLock(object):
"""
A lock that will create the lock file for itself, in a thread-safe
manner.
"""
_is_acquired = False
def __init__(self, path: Path):
self._path = path
def __enter__(self):
'Enter the lock.'
with _KLock():
self._path.touch(exist_ok=True)
self._file = self._path.open(mode='r')
flock(self._file, LOCK_EX)
self._is_acquired = True
def __exit__(self, exc_type, exc_value, traceback):
'Exit the lock.'
if not self._is_acquired:
return
flock(self._file, LOCK_UN)
self._file.close()
self._is_acquired = False
acquire = __enter__
def release(self):
'Release the lock.'
return self.__exit__(exc_type=None, exc_value=None, traceback=None)

127
src/ktools/pathlib.py Normal file
View File

@ -0,0 +1,127 @@
from __future__ import annotations
from logging import getLogger
from os import chown
from os import name as os_name
from os import umask
from pathlib import Path as PathBase
from pathlib import PosixPath as PosixPathBase
from pathlib import WindowsPath as WindowsPathBase
from platform import system
from threading import RLock
from typing import Iterable
_UMASK_LOCK = RLock()
_CODE_OSERROR_DIRECTORY_NOT_EMPTY = 66 if system() == 'FreeBSD' else 39
_LOGGER = getLogger(name=__name__)
class Path(PathBase):
'Extending the built-in `Path`.'
def __new__(cls, *args, **kwargs):
if cls is Path:
cls = WindowsPath if os_name == 'nt' else PosixPath
self = cls._from_parts(args)
if not self._flavour.is_supported:
raise NotImplementedError("cannot instantiate %r on your system"
% (cls.__name__,))
return self
def _ensure_parentdirs_inner(
self, relative_path: Path, mode: int | None = None,
uid: int | None = None, gid: int | None = None) -> Path:
'Do the work for `ensure_parentdirs` while threadlocked or not.'
new_path = self
for part in relative_path.parent.parts:
new_path = new_path.joinpath(part)
try:
# A race condition can occur here
new_path.mkdir()
except FileExistsError:
continue
if uid is None and gid is None:
continue
chown(
path=new_path, uid=-1 if uid is None else uid,
gid=-1 if gid is None else gid)
return new_path.joinpath(relative_path.name)
def ensure_parentdirs(
self, relative_path: Path | str | Iterable[str],
mode: int | None = None, uid: int | None = None,
gid: int | None = None) -> Path:
"""
Ensure the directories up until the last part (the filename) in
self, starting from `self`. If `mode`, `uid` and `gid` is
passed, the ownership and modes will be set on the
*newly created* directories. If you pass `mode`, `uid` and
`gid`, make sure you can set the appropriate umask for `mode`
and you can set the ownership for the passed `uid`/`gid`.
Return the ensured `self`+`relative_path` for when done.
"""
if type(relative_path) is str:
relative_path = Path(relative_path)
elif isinstance(relative_path, Iterable): # str is Iterable
relative_path = Path(*relative_path)
if relative_path.is_absolute():
raise ValueError(f'{relative_path!r} must not be absolute.')
if not self.is_dir():
raise ValueError(f'{self!r} must be a directory.')
if mode is None:
return self._ensure_parentdirs_inner(
relative_path=relative_path, mode=mode, uid=uid, gid=gid)
old_umask = None
try:
_UMASK_LOCK.acquire()
old_umask = umask(0o777 - mode)
return self._ensure_parentdirs_inner(
relative_path=relative_path, mode=mode, uid=uid, gid=gid)
finally:
if old_umask:
umask(old_umask)
_UMASK_LOCK.release()
def get_relative(self, to: Path) -> Path:
"""
Calculate and return a relative path between `self` and `to`
paths. Both paths must be absolute!
"""
if not self.is_absolute() or not to.is_absolute():
raise ValueError(f'{self!r} or {to!r} is not absolute.')
items_from = self.parts
items_to = to.parts
# Remove identical path prefix parts
while items_from[0] == items_to[0]:
items_from = items_from[1:]
items_to = items_to[1:]
return Path(*('..' for x in range(1, len(items_from))), *items_to)
def remove_up_to(self, parent: Path):
'Remove the paths in `self` until the passed `parent`.'
if not self.is_absolute() or not parent.is_absolute():
raise ValueError(f'{self!r} and {parent!r} must be absolute.')
self.relative_to(parent)
iter_self = self
while iter_self != parent:
if iter_self.is_dir():
try:
iter_self.rmdir()
_LOGGER.debug(msg=f'Removed {iter_self!r}')
except OSError as exc:
if exc.args[0] != _CODE_OSERROR_DIRECTORY_NOT_EMPTY:
raise
return
else:
iter_self.unlink()
_LOGGER.debug(msg=f'Removed {iter_self!r}')
iter_self = iter_self.parent
class PosixPath(Path, PosixPathBase):
'Extending `PosixPath`.'
class WindowsPath(Path, WindowsPathBase):
'Extending `WindowsPath`.'