datalad_crawler.pipeline

Pipeline functionality.

A pipeline is represented by a simple list or tuple of nodes or other nested pipelines. Each pipeline node is a callable which receives a dictionary (commonly named data), does some processing, and yields (once or multiple times) a derived dictionary (commonly a shallow copy of original dict). For a node to be parametrized it should be implemented as a callable (i.e. define __call__) class, which could obtain parameters in its constructor.

TODO: describe PIPELINE_OPTS and how to specify them for a given (sub-)pipeline.

The data dictionary is used primarily to carry the scraped/produced data, but besides that it will carry few items which some nodes might use. All those item names will start with the datalad_ prefix, and will be intended for ‘inplace’ modifications or querying. The following items are planned to be provided by the pipeline runner:

datalad_settings

PipelineSettings object which could be used to provide configuration for the current run of the pipeline. E.g.:

  • dry: either nodes are intended not to perform any changes which would reflect on disk
  • skip_existing:
datalad_stats
ActivityStats/dict object to accumulate statistics on what has been done by the nodes so far

To some degree, we could make an analogy when blood is to data and venous system is to pipeline. Blood delivers various elements which are picked up by various parts of our body when they know what to do with the corresponding elements. To the same degree nodes can consume, augment, or produce new items to the data and send it down the stream. Since there is no strict typing or specification on what nodes could consume or produce (yet), no verification is done and things can go utterly wrong. So nodes must be robust and provide informative logging.

exception datalad_crawler.pipeline.FinishPipeline[source]

Bases: exceptions.Exception

Exception to use to signal that any given pipeline should be stopped

datalad_crawler.pipeline.get_repo_pipeline_config_path(repo_path='.')[source]

Given a path within a repo, return path to the crawl.cfg

datalad_crawler.pipeline.get_repo_pipeline_script_path(repo_path='.')[source]

If there is a single pipeline present among ‘pipelines/’, return path to it

datalad_crawler.pipeline.initiate_pipeline_config(template, template_func=None, template_kwargs=None, path='.', commit=False)[source]

TODO Gergana ;)

datalad_crawler.pipeline.load_pipeline_from_config(path)[source]

Given a path to the pipeline configuration file, instantiate a pipeline

Typical example description

[crawl:pipeline] pipeline = standard func = pipeline1 _kwarg1 = 1

which would instantiate a pipeline from standard.py module by calling standard.pipeline1 with _kwarg1=‘1’. This definition is identical to

[crawl:pipeline] pipeline = standard?func=pipeline1&_kwarg1=1

so that theoretically we could specify basic pipelines completely within a URL

datalad_crawler.pipeline.load_pipeline_from_module(module, func=None, args=None, kwargs=None, return_only=False)[source]

Load pipeline from a Python module

Parameters:
  • module (str) – Module name or filename of the module from which to load the pipeline
  • func (str, optional) – Function within the module to use. Default: pipeline
  • args (list or tuple, optional) – Positional arguments to provide to the function.
  • kwargs (dict, optional) – Keyword arguments to provide to the function.
  • return_only (bool, optional) – flag true if only to return pipeline
datalad_crawler.pipeline.load_pipeline_from_template(name, func=None, args=None, kwargs=None, return_only=False)[source]

Given a name, loads that pipeline from datalad_crawler.pipelines

and later from other locations

Parameters:
  • name (str) – Name of the pipeline (the template) defining the filename, or the full path to it (TODO), example: openfmri
  • func (str) – Name of function from which pipeline to run example: superdataset_pipeline
  • args (dict, optional) – Positional args for the pipeline, passed as *args into the pipeline call
  • kwargs (dict, optional) – Keyword args for the pipeline, passed as **kwargs into the pipeline call, example: {‘dataset’: ‘ds000001’}
  • return_only (bool, optional) – flag true if only to return pipeline
datalad_crawler.pipeline.reset_pipeline(pipeline)[source]

Given a pipeline, traverse its nodes and call .reset on them

Note: it doesn’t try to call reset if a node doesn’t have it

datalad_crawler.pipeline.run_pipeline(*args, **kwargs)[source]

Run pipeline and assemble results into a list

By default, the pipeline returns only its input (see PIPELINE_OPTS), so if no options for the pipeline were given to return additional items, a [{}] will be provided as output

datalad_crawler.pipeline.xrun_pipeline(pipeline, data=None, stats=None, reset=True)[source]

Yield results from the pipeline.

datalad_crawler.pipeline.xrun_pipeline_steps(pipeline, data, output='input')[source]

Actually run pipeline steps, feeding yielded results to the next node and yielding results back.

Recursive beast which runs a single node and then recurses to run the rest, possibly multiple times if the current node is a generator. It yields output from the node/nested pipelines, as directed by the output argument.