Source code for datalad.customremotes.base

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Base classes to custom git-annex remotes (e.g. extraction from archives)"""

from __future__ import absolute_import

__docformat__ = 'restructuredtext'

import inspect
import errno
import os
import sys

from import exists, join as opj, dirname, lexists

from urllib.parse import urlparse

import logging
lgr = logging.getLogger('datalad.customremotes')
lgr.log(5, "Importing datalad.customremotes.main")

from ..ui import ui
from import ProtocolInterface
from import DictCache
from ..cmdline.helpers import get_repo_instance
from ..dochelpers import exc_str
from datalad.utils import (



from datalad.ui.progressbars import ProgressBarBase

[docs]class AnnexRemoteQuit(Exception): pass
[docs]def get_function_nargs(f): while hasattr(f, 'wrapped'): f = f.wrapped argspec = getargspec(f) assert not argspec.keywords, \ "ATM we have none defined with keywords, so disabling having them" if argspec.varargs: # Variable number of arguments return -1 else: assert argspec.args, "ATM no static methods" assert argspec.args[0] == "self" return len(argspec.args) - 1
[docs]class AnnexExchangeProtocol(ProtocolInterface): """A little helper to protocol interactions of custom remote with annex """ HEADER = r"""#!/bin/bash set -e # Gets a VALUE response and stores it in $RET report () { echo "$@" >&2 } recv () { read resp #resp=${resp%\n} target="$@" if [ "$resp" != "$target" ]; then report "! exp $target" report " got $resp" else report "+ got $resp" fi } send () { echo "$@" report "sent $@" } """ def __init__(self, repopath, custom_remote_name=None): super(AnnexExchangeProtocol, self).__init__() self.repopath = repopath # resolve once, repeated resolution is slow and depending on # file system operations # unclear why logging needs it at all self.realrepopath = Path(repopath).resolve() self.custom_remote_name = custom_remote_name self._file = None self._initiated = False
[docs] def initiate(self): if self._initiated: return self._initiated = True d = opj(self.repopath, '.git', 'bin') if not exists(d): os.makedirs(d) suf = '-' + self.custom_remote_name.rstrip(':') if self.custom_remote_name else '' self._file = _file = opj(d, 'git-annex-remote-datalad' + suf) if exists(_file): lgr.debug("Commenting out previous entries") # comment out all the past entries with open(_file, 'rb') as f: entries = list(map(assure_unicode, f.readlines())) for i in range(len(self.HEADER.split(os.linesep)), len(entries)): e = entries[i] if e.startswith('recv ') or e.startswith('send '): entries[i] = '#' + e with open(_file, 'wb') as f: f.write(u''.join(entries).encode('utf-8')) return # nothing else to be done lgr.debug("Initiating protocoling." "cd %s; vim %s" % (self.realrepopath, _file[len(self.repopath) + 1:])) with open(_file, 'a') as f: f.write(self.HEADER) os.chmod(_file, 0o755)
[docs] def write_section(self, cmd): self.initiate() with open(self._file, 'a') as f: f.write('%s### %s%s' % (os.linesep, cmd, os.linesep)) lgr.debug("New section in the protocol: " "cd %s; PATH=%s:$PATH %s" % (self.realrepopath, dirname(self._file), cmd))
[docs] def write_entries(self, entries): self.initiate() with open(self._file, 'a') as f: f.write(os.linesep.join(entries + ['']))
def __iadd__(self, entry): self.initiate() with open(self._file, 'a') as f: f.write(entry + os.linesep) return self
[docs] def start_section(self, cmd): self._sections.append({'command': cmd}) self.write_section(cmd) return len(self._sections) - 1
[docs] def end_section(self, id_, exception): # raise exception in case of invalid id_ for consistency: self._sections.__getitem__(id_)
[docs] def add_section(self, cmd, exception): self.start_section(cmd)
@property def records_callables(self): return False @property def records_ext_commands(self): return True @property def do_execute_ext_commands(self): return True @property def do_execute_callables(self): return True
[docs]class AnnexCustomRemote(object): """Base class to provide custom special remotes for git-annex Implements git-annex special custom remotes protocol described at """ # Must be defined in subclasses. There is no classlevel properties, so leaving as this for now CUSTOM_REMOTE_NAME = None # if None -- no additional custom remote name SUPPORTED_SCHEMES = () COST = DEFAULT_COST AVAILABILITY = DEFAULT_AVAILABILITY def __init__(self, path=None, cost=None, fin=None, fout=None): # , availability=DEFAULT_AVAILABILITY): """ Parameters ---------- path : string, optional Path to the repository for which this custom remote is serving. Usually this class is instantiated by a script which runs already within that directory, so the default is to point to current directory, i.e. '.' fin: fout: input/output streams. If not specified, stdin, stdout used """ # TODO: probably we shouldn't have runner here but rather delegate # to AnnexRepo's functionality from import AnnexRepo from ..cmd import GitRunner self.runner = GitRunner() # Custom remotes correspond to annex via stdin/stdout self.fin = fin or sys.stdin self.fout = fout or sys.stdout self.repo = get_repo_instance(class_=AnnexRepo) \ if not path \ else AnnexRepo(path, create=False, init=False) self.path = self.repo.path self._progress = 0 # transmission to be reported back if available if cost is None: cost = self.COST self.cost = cost #self.availability = availability.upper() assert(self.AVAILABILITY.upper() in ("LOCAL", "GLOBAL")) # To signal whether we are in the loop and e.g. could correspond to annex self._in_the_loop = False self._protocol = \ AnnexExchangeProtocol(self.path, self.CUSTOM_REMOTE_NAME) \ if os.environ.get('DATALAD_TESTS_PROTOCOLREMOTE') else None self._contentlocations = DictCache(size_limit=100) # TODO: config ? # instruct annex backend UI to use this remote if ui.backend == 'annex': ui.set_specialremote(self) # Delay introspection until the first instance gets born # could in principle be done once in the metaclass I guess self.__class__._introspect_req_signatures() @classmethod def _introspect_req_signatures(cls): """ Check req_ methods to figure out expected number of arguments See """ if hasattr(cls, '_req_nargs'): # We have already figured it out for this class return cls._req_nargs = { m[4:]: get_function_nargs(getattr(cls, m)) for m in dir(cls) if m.startswith('req_') } @classmethod def _get_custom_scheme(cls, prefix): """Helper to generate custom datalad URL prefixes """ # prefix which will be used in all URLs supported by this custom remote # dictates "URL Schemes" standard # 2.1.2 suggests that we do use // since all of our URLs will define # some hierarchical structure. But actually since we might encode # additional information (such as size) into the URL, it will not be # strictly conforming it. Thus we will not use // return "%s+%s" % (URI_PREFIX, prefix) # if .PREFIX else '') # Helpers functionality
[docs] def get_contentlocation(self, key, absolute=False, verify_exists=True): """Return (relative to top or absolute) path to the file containing the key This is a wrapper around AnnexRepo.get_contentlocation which provides caching of the result (we are asking the location for the same archive key often) """ if key not in self._contentlocations: fpath = self.repo.get_contentlocation(key, batch=True) if fpath: # shouldn't store empty ones self._contentlocations[key] = fpath else: fpath = self._contentlocations[key] # but verify that it exists if verify_exists and not lexists(opj(self.path, fpath)): # prune from cache del self._contentlocations[key] fpath = '' if absolute and fpath: return opj(self.path, fpath) else: return fpath
# # Communication with git-annex #
[docs] def send(self, *args): """Send a message to git-annex Parameters ---------- `*args`: list of strings arguments to be joined by a space and passed to git-annex """ msg = " ".join(map(str, args)) # Sanitize since there must be no new lines msg = msg.replace(os.linesep, r'\n') if not self._in_the_loop: lgr.debug("We are not yet in the loop, thus should not send to annex" " anything. Got: %s" % msg.encode()) return try: self.heavydebug("Sending %r" % msg) self.fout.write(msg + "\n") # .encode()) self.fout.flush() if self._protocol is not None: self._protocol += "send %s" % msg except IOError as exc: lgr.debug("Failed to send due to %s" % str(exc)) if exc.errno == errno.EPIPE: self.stop() else: raise exc
[docs] def send_unsupported(self, msg=None): """Send UNSUPPORTED-REQUEST to annex and log optional message in our log """ if msg: lgr.debug(msg) self.send("UNSUPPORTED-REQUEST")
[docs] def read(self, req=None, n=1): """Read a message from git-annex Parameters ---------- req : string, optional Expected request - first msg of the response n : int Number of response elements after first msg """ # TODO: should we strip or should we not? verify how annex would deal # with filenames starting/ending with spaces - encoded? # Split right away l = self.fin.readline().rstrip(os.linesep) if self._protocol is not None: self._protocol += "recv %s" % l msg = l.split(None, n) if req and ((not msg) or (req != msg[0])): # verify correct response was given self.send_unsupported( "Expected %r, got a line %r. Ignoring" % (req, l) ) return None self.heavydebug("Received %r" % (msg,)) return msg
# TODO: see if we could adjust the "originating" file:line, because # otherwise they are all reported from etc
[docs] def heavydebug(self, msg, *args, **kwargs): lgr.log(4, msg, *args, **kwargs)
# Since protocol allows for some messaging back, let's duplicate to lgr
[docs] def debug(self, msg): lgr.debug(msg) self.send("DEBUG", msg)
[docs] def error(self, msg, annex_err="ERROR"): lgr.error(msg) self.send(annex_err, msg)
[docs] def info(self, msg): self.send('INFO', msg)
[docs] def progress(self, bytes): bytes = int(bytes) if self._progress != bytes: self.send("PROGRESS", bytes)
[docs] def main(self): """Interface to the command line tool""" try: self._in_the_loop = True self._loop() except AnnexRemoteQuit: pass # no harm except KeyboardInterrupt: self.stop("Interrupted by user") except Exception as e: self.stop(str(e)) finally: self._in_the_loop = False
[docs] def stop(self, msg=None): lgr.debug("Stopping communications of %s%s" % (self, ": %s" % msg if msg else "")) raise AnnexRemoteQuit(msg)
def _loop(self): """The main loop """ self.send("VERSION", SUPPORTED_PROTOCOL) while True: l = if l is not None and not l: # empty line: exit self.stop() return req, req_load = l[0], l[1:] method = getattr(self, "req_%s" % req, None) if not method: self.send_unsupported( "We have no support for %s request, part of %s response" % (req, l) ) continue req_nargs = self._req_nargs[req] if req_load and req_nargs > 1: assert len(req_load) == 1, "Could be only one due to n=1" # but now we need to slice it according to the respective req # We assume that at least it shouldn't start with a space # since str.split would get rid of it as well, and then we should # have used re.split(" ", ...) req_load = req_load[0].split(None, req_nargs - 1) try: method(*req_load) except Exception as e: self.error("Problem processing %r with parameters %r: %r" % (req, req_load, exc_str(e))) from traceback import format_exc lgr.error("Caught exception detail: %s" % format_exc())
[docs] def req_INITREMOTE(self, *args): """Initialize this remote. Provides high level abstraction. Specific implementation should go to _initialize """ try: self._initremote(*args) except Exception as e: self.error("Failed to initialize %s due to %s" % (self, e), "INITREMOTE-FAILURE") else: self.send("INITREMOTE-SUCCESS")
[docs] def req_PREPARE(self, *args): """Prepare "to deliver". Provides high level abstraction Specific implementation should go to _prepare """ try: self._prepare(*args) except Exception as e: self.error("Failed to prepare %s due to %s" % (self, e), "PREPARE-FAILURE") else: self.send("PREPARE-SUCCESS") self.debug("Encodings: filesystem %s, default %s" % (sys.getfilesystemencoding(), sys.getdefaultencoding()))
[docs] def req_EXPORTSUPPORTED(self): self.send( 'EXPORTSUPPORTED-SUCCESS' if hasattr(self, 'req_EXPORT') else 'EXPORTSUPPORTED-FAILURE' )
## define in subclass if EXPORT is supported # def req_EXPORT(self, name): # pass
[docs] def req_GETCOST(self): self.send("COST", self.cost)
[docs] def req_GETAVAILABILITY(self): self.send("AVAILABILITY", self.AVAILABILITY.upper())
[docs] def req_CLAIMURL(self, url): scheme = urlparse(url).scheme if scheme in self.SUPPORTED_SCHEMES: self.debug("Claiming url %r" % url) self.send("CLAIMURL-SUCCESS") else: self.debug("Not claiming url %s" % url) self.send("CLAIMURL-FAILURE")
# TODO: we should unify what to be overriden and some will provide CHECKURL
[docs] def req_TRANSFER(self, cmd, key, file): if cmd in ("RETRIEVE",): lgr.debug("%s key %s into/from %s" % (cmd, key, file)) # was INFO level try: self._transfer(cmd, key, file) except Exception as exc: self.send( "TRANSFER-FAILURE %s %s %s" % (cmd, key, exc_str(exc)) ) else: self.send_unsupported( "Received unsupported by our TRANSFER command %s" % cmd )
# Specific implementations to be provided in derived classes when necessary
[docs] def req_CHECKURL(self, url): """ The remote replies with one of CHECKURL-FAILURE, CHECKURL-CONTENTS, or CHECKURL-MULTI. CHECKURL-CONTENTS Size|UNKNOWN Filename Indicates that the requested url has been verified to exist. The Size is the size in bytes, or use "UNKNOWN" if the size could not be determined. The Filename can be empty (in which case a default is used), or can specify a filename that is suggested to be used for this url. CHECKURL-MULTI Url Size|UNKNOWN Filename ... Indicates that the requested url has been verified to exist, and contains multiple files, which can each be accessed using their own url. Note that since a list is returned, neither the Url nor the Filename can contain spaces. CHECKURL-FAILURE Indicates that the requested url could not be accessed. """ self.send_unsupported()
[docs] def req_CHECKPRESENT(self, key): """ CHECKPRESENT-SUCCESS Key Indicates that a key has been positively verified to be present in the remote. CHECKPRESENT-FAILURE Key Indicates that a key has been positively verified to not be present in the remote. CHECKPRESENT-UNKNOWN Key ErrorMsg Indicates that it is not currently possible to verify if the key is present in the remote. (Perhaps the remote cannot be contacted.) """ raise NotImplementedError()
[docs] def req_REMOVE(self, key): """ REMOVE-SUCCESS Key Indicates the key has been removed from the remote. May be returned if the remote didn't have the key at the point removal was requested. REMOVE-FAILURE Key ErrorMsg Indicates that the key was unable to be removed from the remote. """ raise NotImplementedError()
[docs] def req_WHEREIS(self, key): """Added in 5.20150812-17-g6bc46e3 provide any information about ways to access the content of a key stored in it, such as eg, public urls. This will be displayed to the user by eg, git annex whereis. The remote replies with WHEREIS-SUCCESS or WHEREIS-FAILURE. Note that users expect git annex whereis to run fast, without eg, network access. This is not needed when SETURIPRESENT is used, since such uris are automatically displayed by git annex whereis. WHEREIS-SUCCESS String Indicates a location of a key. Typically an url, the string can be anything that it makes sense to display to the user about content stored in the special remote. WHEREIS-FAILURE Indicates that no location is known for a key. """ raise NotImplementedError()
def _transfer(self, cmd, key, file): raise NotImplementedError() def _initremote(self, *args): """Custom initialization of the special custom remote.""" pass def _prepare(self, *args): """Prepare special custom remote. To be overridden""" pass # some requests we can send out
[docs] def get_DIRHASH(self, key, full=False): """Gets a two level hash associated with a Key. Parameters ---------- full: bool, optional If True, would spit out full DIRHASH path, i.e. with a KEY/ directory Something like "abc/def". This is always the same for any given Key, so can be used for eg, creating hash directory structures to store Keys in. """ self.send("DIRHASH", key) val ="VALUE", 1)[1] if full: return opj(self.path, val, key) else: return val
[docs] def get_URLS(self, key): """Gets URL(s) associated with a Key. """ urls = [] for scheme in self.SUPPORTED_SCHEMES: scheme_ = scheme + ":" self.send("GETURLS", key, scheme_) while True: url ="VALUE", 1) if not url or len(url) <= 1: # so there were no URL output, we must be done break url = url[1:] if url: assert(len(url) == 1) urls.append(url[0]) else: break self.heavydebug("Got %d URL(s) for key %s: %s", len(urls), key, urls) #if not urls: # raise ValueError("Did not get any URLs for %s which we support" % key) return urls
def _get_key_path(self, key): """Return path to the KEY file """ # TODO: should actually be implemented by AnnexRepo # Command is available in annex >= 20140410 (out, err) = \ self.runner(['git', 'annex', 'contentlocation', key], cwd=self.path) # TODO: it would exit with non-0 if key is not present locally. # we need to catch and throw our exception return opj(self.path, out.rstrip(os.linesep))
[docs]def generate_uuids(): """Generate UUIDs for our remotes. Even though quick, for consistency pre-generated and recorded in""" import uuid return { remote: str(uuid.uuid5(uuid.NAMESPACE_URL, '' % remote)) for remote in {'datalad', 'datalad-archives'} }
[docs]def init_datalad_remote(repo, remote, encryption=None, autoenable=False, opts=[]): """Initialize datalad special remote""" from datalad.consts import DATALAD_SPECIAL_REMOTES_UUIDS"Initiating special remote %s" % remote) remote_opts = [ 'encryption=%s' % str(encryption).lower(), 'type=external', 'autoenable=%s' % str(bool(autoenable)).lower(), 'externaltype=%s' % remote ] # use unique uuid for our remotes # This should help with merges of disconnected repos etc # ATM only datalad/datalad-archives is expected, # so on purpose getitem remote_opts.append('uuid=%s' % DATALAD_SPECIAL_REMOTES_UUIDS[remote]) return repo.init_remote(remote, remote_opts + opts)
lgr.log(5, "Done importing datalad.customremotes.main")