Source code for datalad_metalad.extractors.core

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""MetadataRecord extractor for Datalad's own core storage"""

# TODO dataset metadata
# - known annex UUIDs
# - avoid anything that is specific to a local clone
#   (repo mode, etc.) limit to description of dataset(-network)

from .base import MetadataExtractor
from .. import (
    default_context,
    get_file_id,
    get_agent_id,
)
from datalad.utils import (
    Path,
)
from six import (
    iteritems,
    string_types,
)

import logging
lgr = logging.getLogger('datalad.metadata.extractors.metalad_core')
from datalad.log import log_progress
from datalad.support.constraints import EnsureBool
import datalad.support.network as dsn
from datalad.dochelpers import exc_str

import os.path as op


[docs]class DataladCoreExtractor(MetadataExtractor): # reporting unique file sizes has no relevant use case that I can think of # identifiers are included explicitly _unique_exclude = {'@id', 'contentbytesize', } def __call__(self, dataset, refcommit, process_type, status): # shortcut ds = dataset log_progress( lgr.info, 'extractordataladcore', 'Start core metadata extraction from %s', str(ds), total=len(status) + 1, label='Core metadata extraction', unit=' Files', ) total_content_bytesize = 0 if process_type in ('all', 'content'): for res in self._get_contentmeta(ds, status): total_content_bytesize += res['metadata'].get( 'contentbytesize', 0) log_progress( lgr.info, 'extractordataladcore', 'Extracted core metadata from %s', res['path'], update=1, increment=True) yield dict( res, action="meta_extract", type='file', status='ok', ) if process_type in ('all', 'dataset'): log_progress( lgr.info, 'extractordataladcore', 'Extracted core metadata from %s', ds.path, update=1, increment=True) dsmeta = list(self._yield_dsmeta( ds, status, refcommit, process_type, total_content_bytesize) ) yield dict( metadata={ '@context': default_context, '@graph': dsmeta, }, type='dataset', status='ok', ) log_progress( lgr.info, 'extractordataladcore', 'Finished core metadata extraction from %s', str(ds) ) def _yield_dsmeta(self, ds, status, refcommit, process_type, total_content_bytesize): commitinfo = _get_commit_info(ds, refcommit, status) contributor_ids = [] for contributor in commitinfo.pop('contributors', []): contributor_id = get_agent_id(*contributor[:2]) yield { '@id': contributor_id, # we cannot distinguish real people from machine-committers '@type': 'agent', 'name': contributor[0], 'email': contributor[1], } contributor_ids.append(contributor_id) meta = { # the uniquest ID for this metadata record is the refcommit SHA '@id': refcommit, # the dataset UUID is the main identifier 'identifier': ds.id, '@type': 'Dataset', } meta.update(commitinfo) if contributor_ids: c = [{'@id': i} for i in contributor_ids] meta['hasContributor'] = c[0] if len(c) == 1 else c parts = [{ # schema.org doesn't have anything good for a symlink, as it could # be anything '@type': 'Thing' if part['type'] == 'symlink' else 'DigitalDocument', # relative path within dataset, always POSIX # TODO find a more specific term for "local path relative to root" 'name': Path(part['path']).relative_to(ds.pathobj).as_posix(), '@id': get_file_id(part), } for part in status if part['type'] != 'dataset' ] for subds in [s for s in status if s['type'] == 'dataset']: subdsinfo = { # reference by subdataset commit '@id': 'datalad:{}'.format(subds['gitshasum']), '@type': 'Dataset', 'name': Path(subds['path']).relative_to(ds.pathobj).as_posix(), } subdsid = ds.subdatasets( contains=subds['path'], return_type='item-or-list', result_renderer="disabled").get('gitmodule_datalad-id', None) if subdsid: subdsinfo['identifier'] = 'datalad:{}'.format(subdsid) parts.append(subdsinfo) if parts: meta['hasPart'] = parts if ds.config.obtain( 'datalad.metadata.datalad-core.report-remotes', True, valtype=EnsureBool()): remote_names = ds.repo.get_remotes() distributions = [] known_uuids = {} # start with configured Git remotes for r in remote_names: info = { 'name': r, # not very informative #'description': 'DataLad dataset sibling', } url = ds.config.get('remote.{}.url'.format(r), None) # best effort to recode whatever is configured into a URL if url is not None: url = ri2url(dsn.RI(url)) if url: info['url'] = url # do we have information on the annex ID? annex_uuid = ds.config.get( 'remote.{}.annex-uuid'.format(r), None) if annex_uuid is not None: info['@id'] = 'datalad:{}'.format(annex_uuid) known_uuids[annex_uuid] = info if 'url' in info or '@id' in info: # only record if we have any identifying information # otherwise it is pointless cruft distributions.append(info) # now look for annex info if hasattr(ds.repo, 'repo_info'): info = ds.repo.repo_info(fast=True) for cat in ('trusted repositories', 'semitrusted repositories', 'untrusted repositories'): for r in info[cat]: if r['here'] or r['uuid'] in ( '00000000-0000-0000-0000-000000000001', '00000000-0000-0000-0000-000000000002'): # ignore local and universally available # remotes continue # avoid duplicates, but record all sources, even # if not URLs are around if r['uuid'] not in known_uuids: distributions.append({'@id': r['uuid']}) if distributions: meta['distribution'] = sorted( distributions, key=lambda x: x.get('@id', x.get('url', None)) ) if total_content_bytesize: meta['contentbytesize'] = total_content_bytesize yield meta def _get_contentmeta(self, ds, status): """Get ALL metadata for all dataset content. Returns ------- generator((location, metadata_dict)) """ # cache whereis info of tarball/zip/archives, tend to be used # more than once, can save a chunk of runtime arxiv_whereis = {} # start batched 'annex whereis' and query for availability info # there is no need to make sure a batched command is terminated # properly, the harness in meta_extract will do this wic = whereis_file if hasattr(ds.repo, 'repo_info') \ else lambda x, y: dict(status='error') for rec in status: recorded_archive_keys = set() if rec['type'] == 'dataset': # subdatasets have been dealt with in the dataset metadata continue md = self._describe_file(rec) wi = wic(ds.repo, rec['path']) if wi['status'] != 'ok': yield dict( path=rec['path'], metadata=md, ) continue urls = _get_urls_from_whereis(wi) # urls we the actual file content can be obtained # directly dist = sorted([url for url in urls if url.startswith('http')]) if dist: md['distribution'] = dict(url=dist) ispart = [] for arxiv_url in [url for url in urls if url.startswith('dl+archive:') and \ '#' in url]: key = _get_archive_key(arxiv_url) if not key or key in recorded_archive_keys: # nothing we can work with, or all done continue arxiv_urls = arxiv_whereis.get(key, None) if arxiv_urls is None: try: arxiv_urls = _get_urls_from_whereis( ds.repo.whereis(key, key=True, output='full')) except Exception as e: lgr.debug( 'whereis query failed for key %s: %s', key, exc_str(e)) arxiv_urls = [] arxiv_whereis[key] = arxiv_urls if arxiv_urls: ispart.append({ '@id': key, 'distribution': { 'url': sorted(arxiv_urls), }, }) recorded_archive_keys.add(key) if ispart: md['isPartOf'] = sorted( ispart, key=lambda x: x['@id'] ) yield dict( path=rec['path'], metadata=md, ) def _describe_file(self, rec): info = { '@id': get_file_id(rec), # schema.org doesn't have a useful term, only contentSize # and fileSize which seem to be geared towards human consumption # not numerical accuracy # TODO define the term 'contentbytesize': rec.get('bytesize', 0) if 'bytesize' in rec or rec['type'] == 'symlink' else op.getsize(rec['path']), # TODO the following list are optional enhancement that should come # with individual ON/OFF switches # TODO run `git log` to find earliest and latest commit to determine # 'dateModified' and 'dateCreated' # TODO determine per file 'contributor' from git log } return info
[docs] def get_state(self, dataset): ds = dataset return { # increment when output format changes 'version': 1, 'unique_exclude': list(self._unique_exclude), 'remotes': ds.config.obtain( 'datalad.metadata.datalad-core.report-remotes', True, valtype=EnsureBool()), 'contributors': ds.config.obtain( 'datalad.metadata.datalad-core.report-contributors', True, valtype=EnsureBool()), 'modification-dates': ds.config.obtain( 'datalad.metadata.datalad-core.report-modification-dates', True, valtype=EnsureBool()), }
def _get_urls_from_whereis(wi, prefixes=('http', 'dl+archive:')): """Extract a list of URLs starting with any of the given prefixes from "whereis" output""" return [ url for remote, rprops in iteritems(wi.get('remotes', {}) if 'status' in wi else wi) for url in rprops.get('urls', []) if any(url.startswith(pref) for pref in prefixes) ] def _get_archive_key(whereis): """trying to decode the various flavors of whereis info for archives""" if whereis.startswith(u'dl+archive:'): whereis = whereis[11:] if u'tar#path' in whereis or 'zip#path' in whereis: return whereis.split('#')[0] elif u'.zip/' in whereis: # key will not have a slash return whereis.split('/')[0] def _get_commit_info(ds, refcommit, status): """Get info about all commits, up to (and incl. the refcommit)""" #- get all the commit info with git log --pretty='%aN%x00%aI%x00%H' # - use all first-level paths other than .datalad and .git for the query #- from this we can determine all modification timestamps, described refcommit #- do a subsequent git log query for the determined refcommit to determine # a version by counting all commits since inception up to the refcommit # - we cannot use the first query, because it will be constrained by the # present paths that may not have existed previously at all # grab the history until the refcommit commits = [ line.split('\0') for line in ds.repo.call_git_items_( # name, email, timestamp, shasum ['log', '--pretty=format:%aN%x00%aE%x00%aI%x00%H', refcommit] ) ] # version, always anchored on the first commit (tags could move and # make the integer commit count ambiguous, and subtantially complicate # version comparisons version = '0-{}-g{}'.format( len(commits), # abbreviated shasum (like git-describe) ds.repo.get_hexsha(commits[0][3], short=True), ) meta = { 'version': version, } if ds.config.obtain( 'datalad.metadata.datalad-core.report-contributors', True, valtype=EnsureBool()): meta.update( contributors=sorted(set(tuple(c[:2]) for c in commits))) if ds.config.obtain( 'datalad.metadata.datalad-core.report-modification-dates', True, valtype=EnsureBool()): meta.update( dateCreated=commits[-1][2], dateModified=commits[0][2], ) return meta # TODO RF to be merged with datalad.support.network
[docs]def ri2url(ri): f = ri.fields if isinstance(ri, dsn.URL): return ri.as_str() elif isinstance(ri, dsn.SSHRI): # construct a URL that Git would understand return 'ssh://{}{}{}{}{}{}'.format( f['username'], '@' if f['username'] else '', f['hostname'], ':' if f['port'] else '', f['port'], f['path'] if op.isabs(f['path']) else '/{}'.format(f['path']) if f['path'].startswith('~') else '/~/{}'.format(f['path']) ) elif isinstance(ri, dsn.PathRI): # this has no chance of being resolved outside this machine # not work reporting return None
# The following function pair should be part of AnnexRepo, but a PR was # rejected, because there is already an old whereis() -- but with an # overcomplicated API and no batch-mode support -- going solo...
[docs]def whereis_file(self, path): """Same as `whereis_file_()`, but for a single path and return-dict""" #return list(self.whereis_file_([path]))[0] return list(whereis_file_(self, [path]))[0]
[docs]def whereis_file_(self, paths): """ Parameters ---------- paths : iterable Paths of files to query for, either absolute paths matching the repository root (self.path), or paths relative to the root of the repository Yields ------ dict A response dictionary to each query path with the following keys: 'path' with the queried path in the same form t was provided; 'status' {ok|error} indicating whether git annex was queried successfully for a path; 'key' with the annex key for the file; 'remotes' with a dictionary of remotes that have a copy of the respective file (annex UUIDs are keys, and values are dictionaries with keys: 'description', 'here', 'urls' (list) that contain the values of the respective 'git annex whereis' response. """ if isinstance(paths, string_types): raise ValueError('whereis_file(paths): paths must be ' 'iterable, not a string type') cmd = self._batched.get('whereis', json=True, path=self.path) for path in paths: r = cmd(path) # give path back in the same shape as it came in res = dict(path=path) if not r: yield dict(res, status='error') continue yield dict( res, status='ok' if r.get('success', False) else 'error', key=r['key'], remotes={ remote['uuid']: {x: remote.get(x, None) for x in ('description', 'here', 'urls')} for remote in r['whereis']}, )