Source code for datalad_crawler.pipeline

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Pipeline functionality.

A pipeline is represented by a simple list or tuple of nodes or other nested pipelines.
Each pipeline node is a callable which receives a dictionary (commonly named `data`),
does some processing, and yields (once or multiple times) a derived dictionary (commonly
a shallow copy of original dict).  For a node to be parametrized it should be
implemented as a callable (i.e. define __call__) class, which could obtain parameters
in its constructor.

TODO:  describe   PIPELINE_OPTS  and how to specify them for a given (sub-)pipeline.

The `data` dictionary is used primarily to carry the scraped/produced data, but besides that
it will carry few items which some nodes might use.  All those item names will start with the
`datalad_` prefix, and will be intended for 'inplace' modifications or querying.
The following items are planned to be provided by the pipeline runner:

`datalad_settings`
   PipelineSettings object which could be used to provide configuration for the current
   run of the pipeline. E.g.:

   - dry:  either nodes are intended not to perform any changes which would reflect on disk
   - skip_existing:

`datalad_stats`
   ActivityStats/dict object to accumulate statistics on what has been done by the nodes
   so far

To some degree, we could make an analogy when `blood` is to `data` and `venous system` is to
`pipeline`.  Blood delivers various elements which are picked up by various parts of
our body when they know what to do with the corresponding elements.  To the same degree
nodes can consume, augment, or produce new items to the `data` and send it down the stream.
Since there is no strict typing or specification on what nodes could consume or produce (yet),
no verification is done and things can go utterly wrong.  So nodes must be robust and
provide informative logging.
"""

__dev_doc__ = """
somewhat similar loose/flexible pipelining in Python approaches

- https://github.com/freeman-lab/pipeit
"""

import sys
from glob import glob
from os.path import dirname, join as opj, isabs, exists, curdir, basename
from os import makedirs

from datalad_crawler.consts import CRAWLER_META_DIR, HANDLE_META_DIR, CRAWLER_META_CONFIG_PATH
from datalad_crawler.consts import CRAWLER_META_CONFIG_FILENAME
from datalad.utils import updated
from datalad.utils import get_dataset_root
from datalad.dochelpers import exc_str
from datalad.support.gitrepo import GitRepo
from datalad.support.network import parse_url_opts
from datalad.support.stats import ActivityStats
from datalad.support.exceptions import PipelineNotSpecifiedError
from datalad.support.configparserinc import SafeConfigParserWithIncludes

from logging import getLogger
lgr = getLogger('datalad.crawler.pipeline')

# name of the section in the config file which would define pipeline parameters
CRAWLER_PIPELINE_SECTION = 'crawl:pipeline'
CRAWLER_PIPELINE_SECTION_DEPRECATED = 'crawler'


[docs]class FinishPipeline(Exception): """Exception to use to signal that any given pipeline should be stopped """ pass
# options which could augment behavior of the pipeline, could be specified # only on top of it PIPELINE_OPTS = dict( # nested_pipeline_inherits_opts=True, # would use or not values yielded by the nested pipeline output='input', # last-output, outputs, input+outputs loop=False, # either to feed results into itself (until None returned) ) # which data types depict object being a pipeline PIPELINE_TYPES = (list, tuple)
[docs]def reset_pipeline(pipeline): """Given a pipeline, traverse its nodes and call .reset on them Note: it doesn't try to call reset if a node doesn't have it """ if pipeline: for node in pipeline: if isinstance(node, PIPELINE_TYPES): reset_pipeline(node) elif hasattr(node, '__call__') and hasattr(node, 'reset'): lgr.log(2, "Resetting node %s" % node) node.reset()
[docs]def run_pipeline(*args, **kwargs): """Run pipeline and assemble results into a list By default, the pipeline returns only its input (see PIPELINE_OPTS), so if no options for the pipeline were given to return additional items, a `[{}]` will be provided as output """ output = list(xrun_pipeline(*args, **kwargs)) if output: if 'datalad_stats' in output[-1]: stats = output[-1]['datalad_stats'].get_total() stats_str = stats.as_str(mode='line') else: stats_str = 'no stats collected' else: stats_str = "no output" lgr.info("Finished running pipeline: %s" % stats_str) return output if output else None
def _get_pipeline_opts(pipeline): """Return options and pipeline steps to be ran given the pipeline "definition" Definition might have options as the first element """ opts = PIPELINE_OPTS.copy() if isinstance(pipeline[0], dict): newopts, pipeline = (pipeline[0], pipeline[1:]) opts = updated(opts, newopts) return opts, pipeline
[docs]def xrun_pipeline(pipeline, data=None, stats=None, reset=True): """Yield results from the pipeline. """ id_pipeline = "Pipe #%s" % id(pipeline) def _log(msg, *args): """Helper for uniform debug messages""" lgr.log(5, "%s: " + msg, id_pipeline, *args) _log("%s", pipeline) if reset: _log("Resetting pipeline") reset_pipeline(pipeline) # just for paranoids and PEP8-disturbed, since theoretically every node # should not change the data, so having default {} should be sufficient data = data or {} if 'datalad_stats' in data: if stats is not None: raise ValueError("We were provided stats to use, but data has already datalad_stats") else: data = updated(data, {'datalad_stats': stats or ActivityStats()}) if not len(pipeline): return # options for this pipeline opts, pipeline = _get_pipeline_opts(pipeline) # verify that we know about all specified options unknown_opts = set(opts).difference(set(PIPELINE_OPTS)) if unknown_opts: raise ValueError("Unknown pipeline options %s" % str(unknown_opts)) data_to_process = [data] output = opts['output'] if output not in ('input', 'last-output', 'outputs', 'input+outputs'): raise ValueError("Unknown output=%r" % output) if opts['loop'] and output == 'input': lgr.debug("Assigning output='last-output' for sub-pipeline since we want " "to loop until pipeline returns anything") output_sub = 'last-output' else: output_sub = output log_level = lgr.getEffectiveLevel() data_out = None while data_to_process: _log("processing data. %d left to go", len(data_to_process)) data_in = data_to_process.pop(0) try: for idata_out, data_out in enumerate(xrun_pipeline_steps(pipeline, data_in, output=output_sub)): if log_level <= 3: # provide details of what keys got changed # TODO: unify with 2nd place where it was invoked lgr.log(3, "O3: +%s, -%s, ch%s, ch?%s", *_compare_dicts(data_in, data_out)) _log("got new %dth output", idata_out) if opts['loop']: _log("extending list of data to process due to loop option") data_to_process.append(data_out) if 'outputs' in output: _log("yielding output") yield data_out except FinishPipeline as e: # TODO: decide what we would like to do -- skip that particular pipeline run # or all subsequent or may be go back and only skip that generated result _log("got a signal that pipeline is 'finished'") # TODO: this implementation is somewhat bad since all the output logic is # duplicated within xrun_pipeline_steps, but it is probably unavoidable because of # loop option if output == 'last-output': if data_out: _log("yielding last-output") yield data_out # Input should be yielded last since otherwise it might ruin the flow for typical # pipelines which do not expect anything beyond going step by step # We should yield input data even if it was empty if 'input' in output: _log("finally yielding input data as instructed") yield data
[docs]def xrun_pipeline_steps(pipeline, data, output='input'): """Actually run pipeline steps, feeding yielded results to the next node and yielding results back. Recursive beast which runs a single node and then recurses to run the rest, possibly multiple times if the current node is a generator. It yields output from the node/nested pipelines, as directed by the output argument. """ if not len(pipeline): return node, pipeline_tail = pipeline[0], pipeline[1:] if isinstance(node, (list, tuple)): lgr.debug("Pipe: %s" % str(node)) # we have got a step which is yet another entire pipeline pipeline_gen = xrun_pipeline(node, data, reset=False) if pipeline_gen: # should be similar to as running a node data_in_to_loop = pipeline_gen else: # pipeline can return None, and in such a case # just do not process further, since if it completed # normally, its input would have been provided back lgr.log(7, "Pipeline generator %s returned None", node) data_in_to_loop = [] prev_stats = None # we do not care to check if entire pipeline drops stats # since it is done below at the node level else: # it is a "node" which should generate (or return) us an iterable to feed # its elements into the rest of the pipeline try: node_str = node._custom_str except AttributeError: node_str = str(node) lgr.debug("Node: %s", node_str) prev_stats = data.get('datalad_stats', None) # so we could check if the node doesn't dump it data_in_to_loop = node(data) log_level = lgr.getEffectiveLevel() data_out = None if data_in_to_loop: for data_ in data_in_to_loop: if prev_stats is not None: new_stats = data_.get('datalad_stats', None) if new_stats is None or new_stats is not prev_stats: lgr.debug("Node %s has changed stats to %s from %s. Updating and using previous one", node, prev_stats, new_stats) if new_stats is not None: prev_stats += new_stats data_['datalad_stats'] = prev_stats if log_level <= 4: # provide details of what keys got changed stats_str = data_['datalad_stats'].as_str(mode='line') if 'datalad_stats' in data_ else '' lgr.log(4, "O1: +%s, -%s, ch%s, ch?%s %s", *(_compare_dicts(data, data_) + (stats_str,))) if pipeline_tail: lgr.log(7, " pass %d keys into tail with %d elements", len(data_), len(pipeline_tail)) lgr.log(5, " passed keys: %s", data_.keys()) for data_out in xrun_pipeline_steps(pipeline_tail, data_, output=output): if log_level <= 3: # provide details of what keys got changed # TODO: difference from previous stats! stats_str = data_['datalad_stats'].as_str(mode='line') if 'datalad_stats' in data_ else '' lgr.log(3, "O2: +%s, -%s, ch%s, ch?%s %s", *(_compare_dicts(data, data_out) + (stats_str,))) if 'outputs' in output: yield data_out else: data_out = data_ if 'outputs' in output: yield data_out elif pipeline_tail: lgr.warning("%s returned None, although there is still a tail in the pipeline" % node) if output == 'last-output' and data_out: yield data_out
def _compare_dicts(d1, d2): """Given two dictionaries, return what keys were added, removed, changed or might be changed """ added, removed, changed, maybe_changed = [], [], [], [] all_keys = set(d1).union(set(d2)) for k in all_keys: if k not in d1: added.append(k) elif k not in d2: removed.append(k) else: if d1[k] is d2[k]: continue else: try: if d1[k] != d2[k]: changed.append(k) except: # MIH: TypeError? maybe_changed.append(k) return added, changed, removed, maybe_changed
[docs]def initiate_pipeline_config(template, template_func=None, template_kwargs=None, path=curdir, commit=False): """ TODO Gergana ;) """ lgr.debug("Creating crawler configuration for template %s under %s", template, path) crawl_config_dir = opj(path, CRAWLER_META_DIR) if not exists(crawl_config_dir): lgr.log(2, "Creating %s", crawl_config_dir) makedirs(crawl_config_dir) crawl_config_repo_path = opj(CRAWLER_META_DIR, CRAWLER_META_CONFIG_FILENAME) crawl_config = opj(crawl_config_dir, CRAWLER_META_CONFIG_FILENAME) cfg_ = SafeConfigParserWithIncludes() cfg_.add_section(CRAWLER_PIPELINE_SECTION) cfg_.set(CRAWLER_PIPELINE_SECTION, 'template', template) if template_func: cfg_.set(CRAWLER_PIPELINE_SECTION, 'func', template_func) for k, v in (template_kwargs or {}).items(): cfg_.set(CRAWLER_PIPELINE_SECTION, "_" + k, str(v)) with open(crawl_config, 'w') as f: cfg_.write(f) if commit: repo = GitRepo(path) repo.add(crawl_config_repo_path) if repo.dirty: repo.commit("Initialized crawling configuration to use template %s" % template, _datalad_msg=True) else: lgr.debug("Repository is not dirty -- not committing") return crawl_config
[docs]def load_pipeline_from_module(module, func=None, args=None, kwargs=None, return_only=False): """Load pipeline from a Python module Parameters ---------- module: str Module name or filename of the module from which to load the pipeline func: str, optional Function within the module to use. Default: `pipeline` args: list or tuple, optional Positional arguments to provide to the function. kwargs: dict, optional Keyword arguments to provide to the function. return_only: bool, optional flag true if only to return pipeline """ func = func or 'pipeline' args = args or tuple() kwargs = kwargs or {} # mod = __import__('datalad_crawler.pipelines.%s' % module, fromlist=['datalad_crawler.pipelines']) dirname_ = dirname(module) assert(module.endswith('.py')) try: sys.path.insert(0, dirname_) modname = basename(module)[:-3] # to allow for relative imports within "stock" pipelines if dirname_ == opj(dirname(__file__), 'pipelines'): mod = __import__('datalad_crawler.pipelines.%s' % modname, fromlist=['datalad_crawler.pipelines']) else: mod = __import__(modname, level=0) if return_only: return getattr(mod, func) return getattr(mod, func)(*args, **kwargs) except Exception as e: raise RuntimeError("Failed to import pipeline from %s: %s" % (module, exc_str(e))) finally: if dirname_ in sys.path: path = sys.path.pop(0) if path != dirname_: lgr.warning("Popped %s when expected %s. Restoring!!!" % (path, dirname_)) sys.path.insert(0, path)
def _find_pipeline(name): """Given a name for a pipeline, looks for the pipeline under common locations """ def candidates(name): if not name.endswith('.py'): name += '.py' # first -- current directory repo_path = get_dataset_root(curdir) if repo_path: yield opj(repo_path, CRAWLER_META_DIR, 'pipelines', name) # TODO: look under other .datalad locations as well # last -- within datalad code yield opj(dirname(__file__), 'pipelines', name) # datalad's module shipped within it for candidate in candidates(name): if exists(candidate): lgr.debug("Found pipeline %s under %s", name, candidate) return candidate lgr.log(5, "No pipeline %s under %s", name, candidate) return None
[docs]def load_pipeline_from_template(name, func=None, args=None, kwargs=None, return_only=False): """Given a name, loads that pipeline from datalad_crawler.pipelines and later from other locations Parameters ---------- name: str Name of the pipeline (the template) defining the filename, or the full path to it (TODO), example: openfmri func: str Name of function from which pipeline to run example: superdataset_pipeline args: dict, optional Positional args for the pipeline, passed as `*args` into the pipeline call kwargs: dict, optional Keyword args for the pipeline, passed as `**kwargs` into the pipeline call, example: {'dataset': 'ds000001'} return_only: bool, optional flag true if only to return pipeline """ if isabs(name) or exists(name): raise NotImplementedError("Don't know how to import straight path %s yet" % name) # explicit isabs since it might not exist filename = name \ if (isabs(name) or exists(name)) \ else _find_pipeline(name) if filename: if not exists(filename): raise PipelineNotSpecifiedError("Pipeline file %s is N/A" % filename) else: raise PipelineNotSpecifiedError("could not find pipeline for %s" % name) return load_pipeline_from_module(filename, func=func, args=args, kwargs=kwargs, return_only=return_only)
# TODO: we might need to find present .datalad/crawl in another branch if not # present currently
[docs]def load_pipeline_from_config(path): """Given a path to the pipeline configuration file, instantiate a pipeline Typical example description [crawl:pipeline] pipeline = standard func = pipeline1 _kwarg1 = 1 which would instantiate a pipeline from standard.py module by calling `standard.pipeline1` with `_kwarg1='1'`. This definition is identical to [crawl:pipeline] pipeline = standard?func=pipeline1&_kwarg1=1 so that theoretically we could specify basic pipelines completely within a URL """ cfg_ = SafeConfigParserWithIncludes() cfg_.read([path]) pipeline = None for sec in (CRAWLER_PIPELINE_SECTION, CRAWLER_PIPELINE_SECTION_DEPRECATED): if not cfg_.has_section(sec): continue if sec == CRAWLER_PIPELINE_SECTION_DEPRECATED: lgr.warning("Crawler section was renamed from %s to %s and format has changed" " please adjust", CRAWLER_PIPELINE_SECTION_DEPRECATED, CRAWLER_PIPELINE_SECTION) opts = cfg_.options(sec) # must have template if 'template' not in opts: raise PipelineNotSpecifiedError("%s lacks %r field within %s section" % (path, 'template', sec)) template = cfg_.get(sec, 'template') # parse template spec template_name, url_opts = parse_url_opts(template) # so we will allow to specify options in the url and then also in the section definitions all_opts = updated(url_opts, {o: cfg_.get(sec, o) for o in opts}) template_opts = {k: v for k, v in all_opts.items() if not k.startswith('_')} pipeline_opts = {k[1:]: v for k, v in all_opts.items() if k.startswith('_')} assert not set(template_opts).difference({'template', 'func'}), "ATM we understand only 'func'" pipeline = load_pipeline_from_template( template_name, func=template_opts.get('func', None), kwargs=pipeline_opts) break if pipeline is None: raise IOError("Did not find section %r within %s" % (CRAWLER_PIPELINE_SECTION, path)) return pipeline
[docs]def get_repo_pipeline_config_path(repo_path=curdir): """Given a path within a repo, return path to the crawl.cfg""" if not exists(opj(repo_path, HANDLE_META_DIR)): # we need to figure out top path for the repo repo_path = get_dataset_root(repo_path) if not repo_path: return None return opj(repo_path, CRAWLER_META_CONFIG_PATH)
[docs]def get_repo_pipeline_script_path(repo_path=curdir): """If there is a single pipeline present among 'pipelines/', return path to it""" # TODO: somewhat adhoc etc -- may be improve with some dedicated name being # tracked or smth like that if not exists(opj(repo_path, HANDLE_META_DIR)): # we need to figure out top path for the repo repo_path = get_dataset_root(repo_path) if not repo_path: return None pipelines = glob(opj(repo_path, CRAWLER_META_DIR, 'pipelines', '*.py')) if len(pipelines) > 1 or not pipelines: return None return pipelines[0]