Source code for datalad_next.annexremotes.archivist

"""git-annex special remote *archivist* for obtaining files from archives
"""

from __future__ import annotations

# General TODO for future improvements
#
# `datalad.archivist.archive-cache-mode=<name>`
#   Choice of archive (access) caching behavior. ``<name>`` can be any of
#
#   ``persistent-whole``
#     This causes an archive to be downloaded completely on first access to any
#     archive member. A regular ``annex get`` is performed and an archive is
#     placed at its standard location in the local annex. Any archive member
#     will be extracted from this local copy.
#
# Some ideas on optional additional cache modes related to dropping as much as
# possible after the special remote is done. However, these modes also come
# with potential issues re parallel access (what if another remote process
# is still using a particular archive... Think about that when there is a
# real need
#
#  ``keep-downloads``
#    No caching will be performed per se. However, when archive member access
#    happens to require a full archive download, a downloaded archive will
#    not be removed after member extraction. In such cases, this mode will
#    behave like ``persistent-whole``.
#
#  ``none``
#    This is behaving like ``keep-downloads``, but any downloaded archive
#    will be dropped again after extraction is complete.

from collections.abc import Iterable
from dataclasses import dataclass

from pathlib import Path
from shutil import copyfileobj
from typing import (
    Dict,
    Generator,
    List,
    Tuple,
)

from datalad_next.archive_operations import ArchiveOperations

# we intentionally limit ourselves to the most basic interface
# and even that we only need to get a `ConfigManager` instance.
# If that class would support a plain path argument, we could
# avoid it entirely
from datalad_next.datasets import LegacyAnnexRepo

from datalad_next.exceptions import CommandError
from datalad_next.types import (
    AnnexKey,
    ArchivistLocator,
    ArchiveType,
)

from . import (
    RemoteError,
    SpecialRemote,
    UnsupportedRequest,
    super_main
)


[docs] class ArchivistRemote(SpecialRemote): """git-annex special remote *archivist* for obtaining files from archives Successor of the `datalad-archive` special remote. It claims and acts on particular archive locator "URLs", registered for individual annex keys (see :class:`datalad_next.types.archivist.ArchivistLocator`). These locators identify another annex key that represents an archive (e.g., a tarball or a zip files) that contains the respective annex key as a member. This special remote trigger the extraction of such members from any candidate archive when retrieval of a key is requested. This special remote cannot store or remove content. The desired usage is to register a locator "URL" for any relevant key via ``git annex addurl|registerurl`` or ``datalad addurls``. Configuration ------------- The behavior of this special remote can be tuned via a number of configuration settings. `datalad.archivist.legacy-mode=yes|[no]` If enabled, all special remote operations fall back onto the legacy ``datalad-archives`` special remote implementation. This mode is only provided for backward-compatibility. This legacy implementation unconditionally downloads archive files completely, and keeps an internal cache of the full extracted archive around. The implied 200% (or more) storage cost overhead for obtaining a complete dataset can be prohibitive for datasets tracking large amount of data (in archive files). Implementation details ---------------------- *CHECKPRESENT* When performing a non-download test for the (continued) presence of an annex key (as triggered via ``git annex fsck --fast`` or ``git annex checkpresentkey``), the underlying archive containing a key will NOT be inspected. Instead, only the continued availability of the annex key for the containing archive will be tested. In other words: this implementation trust the archive member annotation to be correct/valid, and it also trusts the archive content to be unchanged. The latter will be generally the case, but may no with URL-style keys. Not implementing such a trust-approach *would* have a number of consequences. Depending on where the archive is located (local/remote) and what format it is (fsspec-inspectable or not), we would need to download it completely in order to verify a matching archive member. Moreover, an archive might also reference another archive as a source, leading to a multiplication of transfer demands. """ def __init__(self, annex): super().__init__(annex) # central archive handler cache, initialized on-prepare self._ahandlers = None # a potential instance of the legacy datalad-archives implementation self._legacy_special_remote = None
[docs] def __getattribute__(self, name: str): """Redirect top-level API calls to legacy implementation, if needed""" lsr = SpecialRemote.__getattribute__(self, '_legacy_special_remote') if lsr is None or name not in ( 'initremote', 'prepare', 'claimurl', 'checkurl', 'checkpresent', 'remove', 'whereis', 'transfer_retrieve', 'stop', ): # we are not in legacy mode or this is no top-level API call return SpecialRemote.__getattribute__(self, name) return getattr(lsr, name)
[docs] def initremote(self): """This method does nothing, because the special remote requires no particular setup. """ pass
[docs] def prepare(self): """Prepare the special remote for requests by git-annex If the special remote is instructed to run in "legacy mode", all subsequent operations will be processed by the ``datalad-archives`` special remote implementation! """ # we have to do this here, because the base class `.repo` will only give # us a `LeanAnnexRepo`. # TODO it is unclear to MIH what is actually needed API-wise of the legacy # interface. Needs research. self._repo = LegacyAnnexRepo(self.annex.getgitdir()) # are we in legacy mode? # let remote-specific setting take priority (there could be # multiple archivist-type remotes configured), and use unspecific switch # as a default, with a general default of NO if self.get_remote_gitcfg( 'archivist', 'legacy-mode', default='no').lower() == 'yes': # ATTENTION DEBUGGERS! # If we get here, we will bypass all of the archivist # implementation! Check __getattribute__() -- pretty much no # other code in this file will run!!! # __getattribute__ will relay all top-level operations # to an instance of the legacy implementation from datalad.customremotes.archives import ArchiveAnnexCustomRemote lsr = ArchiveAnnexCustomRemote(self.annex) lsr.prepare() # we can skip everything else, it won't be triggered anymore self._legacy_special_remote = lsr return # central archive key handler coordination self._ahandlers = _ArchiveHandlers( self.repo, # TODO #cache_mode=self._getcfg( # 'archive-cache-mode', # default='').lower(), )
[docs] def claimurl(self, url: str) -> bool: """Returns True for :class:`~datalad_next.types.archivist.ArchivistLocator`-style URLs Only a lexical check is performed. Any other URL will result in ``False`` to be returned. """ try: ArchivistLocator.from_str(url) return True except Exception: return False
[docs] def checkurl(self, url: str) -> bool: """Parses :class:`~datalad_next.types.archivist.ArchivistLocator`-style URLs Returns ``True`` for any syntactically correct URL with all required properties. The implementation is identical to ``claimurl()``. """ try: ArchivistLocator.from_str(url) except Exception as e: self.message(f'Invalid URL {url!r}: {e}', type='debug') return False # we should be able to work with this. # do not actually test whether the archive is around or whether # the path actually points to a member in the archive, # leave to transfer_retrieve # Do not give detailed info to git-annex for now # https://github.com/Lykos153/AnnexRemote/issues/60 #if member_props.get('size'): # return dict( # filename=member_props['path'].name, # size=member_props['size'], # ) #else: # return dict(filename=member_props['path'].name) return True
[docs] def checkpresent(self, key: str) -> bool: """Verifies continued availability of the archive referenced by the key No content verification of the archive, or of the particular archive member is performed. See "Implementation details" of this class for a rational. Returns ------- bool True if the referenced archive key is present on any remote. False if not. """ # the idea here is that: as long as the archive declared to contain # the key is still accessible, we declare CHECKPRESENT. # In other words: we trust the archive member annotation to be # correct/valid. # not trusting it would have sever consequences. depending on # where the archive is located (local/remote) and what format it # is (fsspec-inspectable), we might need to download it completely # in order to verify a matching archive member. Moreover, an archive # might also reference another archive as a source, leading to a # multiplication of transfer demands # get all associated archive keys, turn into set because any key might # map to multiple archive keys, and we only need to check them once akeys = set( str(ArchivistLocator.from_str(url).akey) for url in self._get_key_dlarchive_urls(key) ) # As with transfer_retrieve blindly checking akeys in arbitrary # order is stupid. We should again sort by (local) availability. # if we have an archive locally we can check faster, we could check # more precisely (actually look into it). # We only need to find one archive with a hit, if we search clever # we can exit earlier. # So let's do a two-pass approach, first check local availability # for any archive key, and only if that does not find us an archive # go for the remotes if any(_get_key_contentpath(self.repo, akey) for akey in akeys): # any one is good enough # TODO here we could actually look into the archive and # verify member presence without relatively little cost return True for akey in akeys: # we leave all checking logic to git-annex try: # if it exits clean, the key is still present at at least one # remote self.repo.call_annex(['checkpresentkey', akey]) return True except CommandError: self.message( f'Archive key candidate {akey} for key {key} ' 'not present in any known remote or here', type='debug') # when we end up here, we have tried all known archives keys and # found none to be present in any known location return False
[docs] def transfer_retrieve(self, key: str, localfilename: str): """Retrieve an archive member from a (remote) archive All registered locators for a requested key will be sorted by availability and size of the references archives. For each archive the most suitable handler will be initialized, and extraction of the identified member will be attempted. If that fails, the next handler is tried until all candidate handlers are exhausted. Depending on the archive availability and type, archives may need to be retrieved from remote sources. """ # rely on from_locators() to bring the candidate archives # in some intelligent order to try one after the other. # break ASAP to prevent unnecessary processing msgs = [] try: for handler, locs in self._ahandlers.from_locators([ ArchivistLocator.from_str(url) for url in self._get_key_dlarchive_urls(key)]): with Path(localfilename).open('wb') as dst_fp: for loc in locs: try: with handler.open(loc.member) as fp: # TODO progress reporting # but what progress? the extraction # may just be one part, there could also # be file retrieval copyfileobj(fp, dst_fp) return except Exception as e: msg = f'Failed to extract {key!r} from ' \ f'{handler} ({loc.member}): {e}' self.message(msg, type='debug') msgs.append(msg) except Exception as e: raise RemoteError(f'Could not obtain {key!r}') from e raise RemoteError(f'Could not obtain {key!r} from any archive')
[docs] def transfer_store(self, key: str, filename: str): """Raises ``UnsupportedRequest``. This operation is not supported.""" raise UnsupportedRequest('This remote cannot store content')
[docs] def remove(self, key: str): """Raises ``UnsupportedRequest``. This operation is not supported.""" raise UnsupportedRequest('This remote cannot remove content')
# # Helpers # def _get_key_dlarchive_urls(self, key): return self.annex.geturls(key, prefix='dl+archive:')
[docs] def main(): """CLI entry point installed as ``git-annex-remote-archivist``""" super_main( cls=ArchivistRemote, remote_name='archivist', description=\ "access to annex keys stored within other archive-type annex keys ", )
# # Internal helpers # @dataclass class _ArchiveInfo: """Representation of an archive used internally by ``_ArchiveHandlers``""" local_path: Path | None handler: ArchiveOperations | None = None type: ArchiveType | None = None class _ArchiveHandlers: """Wraps annex repo to provide access to keys given by ArchivistLocator(s) The main functionality is provided by ``from_locators()``. """ # TODO make archive access caching behavior configurable from the outside def __init__(self, repo): # mapping of archive keys to an info dict self._db: Dict[AnnexKey, _ArchiveInfo] = {} # for running git-annex queries against the repo self._repo = repo def from_locators( self, locs: List[ArchivistLocator] ) -> Generator[Tuple[ArchiveOperations, Iterable[ArchivistLocator]], None, None]: """Produce archive handlers for the given locators Yield them one-by-one in a maximally intelligent order for efficient retrieval (i.e., handlers for archives that are already available locally first. Each handlers is yielded fully prepared, i.e. if necessary an archive is retrieved before the handler is yielded. Therefore a consumer should not fully consume the returned generator when an operation can be completed before all handlers are exhausted. Parameters ---------- locs: List[ArchivistLocator] Any number of locators that must all refer to the same annex key (key, not archive annex key!). Yields ------ ArchiveOperations, Iterable[ArchivistLocator] The referenced archive annex keys are de-duplicated and sorted by (local) availability and size. For each archive key a suitable ``ArchiveOperations`` handler is yielded together with the locators matching the respective archive. """ # determine all candidate source archive keys akeys = set(loc.akey for loc in locs) # determine which of the known handlers point to a local archive, # yield those for akey, kh in { akey: self._db[akey] for akey in akeys if akey in self._db and self._db[akey].handler }.items(): # local_path will be None now, if not around if kh.local_path: # we found one with a local archive. # yield handler and all matching locators yield kh.handler, [loc for loc in locs if loc.akey == akey] # if we get here, this did not work, do not try again akeys.remove(akey) # of the handlers we do not yet know, which ones have local data, # yield those for akey in [k for k in akeys if k not in self._db]: ainfo = self._get_archive_info(akey, locs) # cache for later self._db[akey] = ainfo if not ainfo.local_path: # do not try a local handler, but keep the akey itself in the # race, we might need to try "remote" access later on continue handler = self._get_local_handler(ainfo) # store for later ainfo.handler = handler # yield handler and all matching locators yield handler, [loc for loc in locs if loc.akey == akey] # if we get here, this did not work, do not try again akeys.remove(akey) # of the handlers we do know, but do not have local data, # possibly obtain the archive, yield those # # this is the same as the first loop, but this time all local # paths are checked, and some akeys might already have been # removed for akey, kh in { akey: self._db[akey] for akey in akeys if akey in self._db and self._db[akey].handler }.items(): yield handler, [loc for loc in locs if loc.akey == akey] # if we get here, this did not work, do not try again akeys.remove(akey) # all that is left is to create "remote" handlers and yield them. # collect any exceptions to report them at the end, if needed exc = [] # but this time sort the keys to start with the smallest ones # (just in case a download is involved) for akey in sorted(akeys, key=lambda x: x.size): # at this point we must have an existing _ArchiveInfo record # for this akey ainfo = self._db[akey] # but we do not have a handler yet assert ainfo.handler is None try: handler = self._get_remote_handler(akey, ainfo) except Exception as e: exc.append(e) continue # if this worked, store the handler for later ainfo.handler = handler yield handler, [loc for loc in locs if loc.akey == akey] # if we get here we can stop -- everything was tried. If there were # exceptions, make sure to report them if exc: # TODO better error e = RuntimeError( 'Exhausted all candidate archive handlers ' f'(previous failures {exc})') e.errors = exc raise e def _get_archive_info( self, akey: AnnexKey, locs: Iterable[ArchivistLocator], ) -> _ArchiveInfo: # figure out if the archive is local first local_path = _get_key_contentpath(self._repo, str(akey)) # get all reported archive types akey_atypes = set( loc.atype for loc in locs if loc.akey == akey and loc.atype ) # if we have (consistent) information, pick the type, if not # set to None/ignore and wait for type detection by handler akey_atype = None if len(akey_atypes) != 1 else akey_atypes.pop() ainfo = _ArchiveInfo( local_path=local_path, type=akey_atype, ) # cache for later self._db[akey] = ainfo return ainfo def _get_local_handler(self, ainfo: _ArchiveInfo) -> ArchiveOperations: if not ainfo.type: # TODO we could still do mime-type detection. We have the # archive file present locally. # check datalad-core how it is done in archive support raise NotImplementedError if ainfo.type == ArchiveType.tar: from datalad_next.archive_operations import TarArchiveOperations return TarArchiveOperations( ainfo.local_path, cfg=self._repo.config, ) else: raise NotImplementedError def _get_remote_handler( self, akey: AnnexKey, ainfo: _ArchiveInfo, ) -> ArchiveOperations: # right now we have no remote handlers available # TODO: use akey to ask the repo for URLs from which the key # would be available and select a remote handler to work # with that URL # instead we retrieve the archive res = self._repo.get(str(akey), key=True) # if the akey was already around, `res` could be an empty list. # however, under these circumstances we should not have ended # up here. assert to alert on logic error in that case assert isinstance(res, dict) if res.pop('success', None) is not True: # TODO better error raise RuntimeError(f'Failed to download archive key: {res!r}') # now we have the akey locally ainfo.local_path = _get_key_contentpath(self._repo, str(akey)) return self._get_local_handler(ainfo) def _get_key_contentpath(repo: LegacyAnnexRepo, key: str): """Return ``Path`` to a locally present annex key, or ``None`` ``None`` is return when there is not such key present locally. """ try: # if it exits clean, there will be a content location # and the content can be found at the location loc = next(repo.call_annex_items_(['contentlocation', key])) # convert to path. git-annex will report a path relative to the # dotgit-dir # TODO platform-native? loc = repo.dot_git / Path(loc) except CommandError: loc = None return loc