Source code for datalad.support.archives

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Various handlers/functionality for different types of files (e.g. for archives)

"""

import hashlib
import logging
import os
import random
import string
import tempfile

from datalad import cfg
from datalad.config import anything2bool
from datalad.consts import ARCHIVES_TEMP_DIR
from datalad.support.external_versions import external_versions
from datalad.support.locking import lock_if_check_fails
from datalad.support.path import (
    abspath,
    exists,
    isabs,
    isdir,
)
from datalad.support.path import join as opj
from datalad.support.path import (
    normpath,
    pardir,
    relpath,
)
from datalad.support.path import sep as opsep
from datalad.utils import (
    Path,
    any_re_search,
    ensure_bytes,
    ensure_unicode,
    get_tempfile_kwargs,
    on_windows,
    rmtemp,
    rmtree,
    unlink,
)

# fall back on patool, if requested, or 7z is not found
if (cfg.obtain('datalad.runtime.use-patool', default=False,
               valtype=anything2bool)
        or not external_versions['cmd:7z']):
    from datalad.support.archive_utils_patool import compress_files
    from datalad.support.archive_utils_patool import \
        decompress_file as \
        _decompress_file  # other code expects this to be here
else:
    from datalad.support.archive_utils_7z import compress_files
    from datalad.support.archive_utils_7z import \
        decompress_file as \
        _decompress_file  # other code expects this to be here

lgr = logging.getLogger('datalad.support.archives')


[docs] def decompress_file(archive, dir_, leading_directories='strip'): """Decompress `archive` into a directory `dir_` Parameters ---------- archive: str dir_: str leading_directories: {'strip', None} If `strip`, and archive contains a single leading directory under which all content is stored, all the content will be moved one directory up and that leading directory will be removed. """ if not exists(dir_): lgr.debug("Creating directory %s to extract archive into", dir_) os.makedirs(dir_) _decompress_file(archive, dir_) if leading_directories == 'strip': _, dirs, files = next(os.walk(dir_)) if not len(files) and len(dirs) == 1: # move all the content under dirs[0] up 1 level widow_dir = opj(dir_, dirs[0]) lgr.debug("Moving content within %s upstairs", widow_dir) subdir, subdirs_, files_ = next(os.walk(opj(dir_, dirs[0]))) for f in subdirs_ + files_: os.rename(opj(subdir, f), opj(dir_, f)) # NFS might hold it victim so use rmtree so it tries a few times rmtree(widow_dir) elif leading_directories is None: pass # really do nothing else: raise NotImplementedError("Not supported %s" % leading_directories)
def _get_cached_filename(archive): """A helper to generate a filename which has original filename and additional suffix which wouldn't collide across files with the same name from different locations """ #return "%s_%s" % (basename(archive), hashlib.md5(archive).hexdigest()[:5]) # per se there is no reason to maintain any long original name here. archive_cached = hashlib.md5(ensure_bytes(str(Path(archive).resolve()))).hexdigest()[:10] lgr.debug("Cached directory for archive %s is %s", archive, archive_cached) return archive_cached def _get_random_id(size=6, chars=string.ascii_uppercase + string.digits): """Return a random ID composed from digits and uppercase letters upper-case so we are tolerant to unlikely collisions on dummy FSs """ return ''.join(random.choice(chars) for _ in range(size))
[docs] class ArchivesCache(object): """Cache to maintain extracted archives Parameters ---------- toppath : str Top directory under .git/ of which temp directory would be created. If not provided -- random tempdir is used persistent : bool, optional Passed over into generated ExtractedArchives """ # TODO: make caching persistent across sessions/runs, with cleanup # IDEA: extract under .git/annex/tmp so later on annex unused could clean it # all up def __init__(self, toppath=None, persistent=False): self._toppath = toppath if toppath: path = opj(toppath, ARCHIVES_TEMP_DIR) if not persistent: tempsuffix = "-" + _get_random_id() lgr.debug("For non-persistent archives using %s suffix for path %s", tempsuffix, path) path += tempsuffix # TODO: begging for a race condition if not exists(path): lgr.debug("Initiating clean cache for the archives under %s", path) try: self._made_path = True os.makedirs(path) lgr.debug("Cache initialized") except Exception: lgr.error("Failed to initialize cached under %s", path) raise else: lgr.debug( "Not initiating existing cache for the archives under %s", path) self._made_path = False else: if persistent: raise ValueError( "%s cannot be persistent, because no toppath was provided" % self) path = tempfile.mkdtemp(**get_tempfile_kwargs()) self._made_path = True self._path = path self.persistent = persistent # TODO? ensure that it is absent or we should allow for it to persist a bit? #if exists(path): # self._clean_cache() self._archives = {} # TODO: begging for a race condition if not exists(path): lgr.debug("Initiating clean cache for the archives under %s", self.path) try: self._made_path = True os.makedirs(path) lgr.debug("Cache initialized") except Exception as e: lgr.error("Failed to initialize cached under %s", path) raise else: lgr.debug("Not initiating existing cache for the archives under %s", self.path) self._made_path = False @property def path(self): return self._path
[docs] def clean(self, force=False): for aname, a in list(self._archives.items()): a.clean(force=force) del self._archives[aname] # Probably we should not rely on _made_path and not bother if persistent removing it # if ((not self.persistent) or force) and self._made_path: # lgr.debug("Removing the entire archives cache under %s", self.path) # rmtemp(self.path) if (not self.persistent) or force: lgr.debug("Removing the entire archives cache under %s", self.path) rmtemp(self.path)
def _get_normalized_archive_path(self, archive): """Return full path to archive So we have consistent operation from different subdirs, while referencing archives from the topdir TODO: why do we need it??? """ if not isabs(archive) and self._toppath: out = normpath(opj(self._toppath, archive)) if relpath(out, self._toppath).startswith(pardir): raise RuntimeError("%s points outside of the topdir %s" % (archive, self._toppath)) if isdir(out): raise RuntimeError("got a directory here... bleh") return out return archive
[docs] def get_archive(self, archive): archive = self._get_normalized_archive_path(archive) if archive not in self._archives: self._archives[archive] = \ ExtractedArchive(archive, opj(self.path, _get_cached_filename(archive)), persistent=self.persistent) return self._archives[archive]
def __getitem__(self, archive): return self.get_archive(archive) def __delitem__(self, archive): archive = self._get_normalized_archive_path(archive) self._archives[archive].clean() del self._archives[archive] def __del__(self): try: # we can at least try if not self.persistent: self.clean() except: # MIH: IOError? pass
[docs] class ExtractedArchive(object): """Container for the extracted archive """ # suffix to use for a stamp so we could guarantee that extracted archive is STAMP_SUFFIX = '.stamp' def __init__(self, archive, path=None, persistent=False): self._archive = archive # TODO: bad location for extracted archive -- use tempfile if not path: path = tempfile.mktemp(**get_tempfile_kwargs(prefix=_get_cached_filename(archive))) if exists(path) and not persistent: raise RuntimeError("Directory %s already exists whenever it should not " "persist" % path) self._persistent = persistent self._path = path def __repr__(self): return "%s(%r, path=%r)" % (self.__class__.__name__, self._archive, self.path)
[docs] def clean(self, force=False): # would interfere with tests # if os.environ.get('DATALAD_TESTS_TEMP_KEEP'): # lgr.info("As instructed, not cleaning up the cache under %s" # % self._path) # return for path, name in [ (self._path, 'cache'), (self.stamp_path, 'stamp file') ]: if exists(path): if (not self._persistent) or force: lgr.debug("Cleaning up the %s for %s under %s", name, self._archive, path) # TODO: we must be careful here -- to not modify permissions of files # only of directories (rmtree if isdir(path) else unlink)(path)
@property def path(self): """Given an archive -- return full path to it within cache (extracted) """ return self._path @property def stamp_path(self): return self._path + self.STAMP_SUFFIX @property def is_extracted(self): return exists(self.path) and exists(self.stamp_path) \ and os.stat(self.stamp_path).st_mtime >= os.stat(self.path).st_mtime
[docs] def assure_extracted(self): """Return path to the extracted `archive`. Extract archive if necessary """ path = self.path with lock_if_check_fails( check=(lambda s: s.is_extracted, (self,)), lock_path=path, operation="extract" ) as (check, lock): if lock: assert not check self._extract_archive(path) return path
def _extract_archive(self, path): # we need to extract the archive # TODO: extract to _tmp and then move in a single command so we # don't end up picking up broken pieces lgr.debug("Extracting %s under %s", self._archive, path) if exists(path): lgr.debug( "Previous extracted (but probably not fully) cached archive " "found. Removing %s", path) rmtree(path) os.makedirs(path) assert (exists(path)) # remove old stamp if exists(self.stamp_path): rmtree(self.stamp_path) decompress_file(self._archive, path, leading_directories=None) # TODO: must optional since we might to use this content, move it # into the tree etc # lgr.debug("Adjusting permissions to R/O for the extracted content") # rotree(path) assert (exists(path)) # create a stamp with open(self.stamp_path, 'wb') as f: f.write(ensure_bytes(self._archive)) # assert that stamp mtime is not older than archive's directory assert (self.is_extracted) # TODO: remove? #def has_file_ready(self, afile): # lgr.debug(u"Checking file {afile} from archive {archive}".format(**locals())) # return exists(self.get_extracted_filename(afile))
[docs] def get_extracted_filename(self, afile): """Return full path to the `afile` within extracted `archive` It does not actually extract any archive """ return opj(self.path, afile)
[docs] def get_extracted_files(self): """Generator to provide filenames which are available under extracted archive """ path = self.assure_extracted() path_len = len(path) + (len(os.sep) if not path.endswith(os.sep) else 0) for root, dirs, files in os.walk(path): # TEMP for name in files: yield ensure_unicode(opj(root, name)[path_len:])
[docs] def get_leading_directory(self, depth=None, consider=None, exclude=None): """Return leading directory of the content within archive Parameters ---------- depth: int or None, optional Maximal depth of leading directories to consider. If None - no upper limit consider : list of str, optional Regular expressions for file/directory names to be considered (before exclude). Applied to the entire relative path to the file as in the archive exclude: list of str, optional Regular expressions for file/directory names to be excluded from consideration. Applied to the entire relative path to the file as in the archive Returns ------- str or None: If there is no single leading directory -- None returned """ leading = None # returns only files, so no need to check if a dir or not for fpath in self.get_extracted_files(): if consider and not any_re_search(consider, fpath): continue if exclude and any_re_search(exclude, fpath): continue lpath = fpath.split(opsep) dpath = lpath[:-1] # directory path components if leading is None: leading = dpath if depth is None else dpath[:depth] else: if dpath[:len(leading)] != leading: # find smallest common path leading_ = [] # TODO: there might be more efficient pythonic way for d1, d2 in zip(leading, dpath): if d1 != d2: break leading_.append(d1) leading = leading_ if not len(leading): # no common leading - ready to exit return None return leading if leading is None else opj(*leading)
[docs] def get_extracted_file(self, afile): lgr.debug("Requested file %s from archive %s", afile, self._archive) # TODO: That could be a good place to provide "compatibility" layer if # filenames within archive are too obscure for local file system. # We could somehow adjust them while extracting and here channel back # "fixed" up names since they are only to point to the load self.assure_extracted() path = self.get_extracted_filename(afile) # TODO: make robust lgr.log(2, "Verifying that %s exists", abspath(path)) assert exists(path), "%s must exist" % path return path
def __del__(self): try: if self._persistent: self.clean() except Exception as e: # MIH: IOError? pass