Source code for datalad_metalad.filter

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Run a metadata filter on a set of metadata elements
"""
import json
import logging
from pathlib import Path
from typing import (
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)

from datalad.distribution.dataset import Dataset
from datalad.distribution.dataset import (
    datasetmethod,
    EnsureDataset,
)
from datalad.interface.base import (
    Interface,
    build_doc,
    eval_results,
)
from datalad.support.constraints import (
    EnsureNone,
    EnsureStr,
)
from datalad.support.param import Parameter
from datalad.ui import ui

from .dump import (
    dump_from_dataset_tree,
    dump_from_uuid_set,
)
from .filters.base import MetadataFilterBase
from .metadatatypes.metadata import (
    MetadataRecord,
    MetadataResult,
    META_FILTER,
)
from .metadatatypes.result import (
    OK,
)
from .metadatautils import get_metadata_objects
from .pathutils.metadataurlparser import (
    MetadataURL,
    TreeMetadataURL,
    UUIDMetadataURL,
    parse_metadata_url,
)


__docformat__ = "restructuredtext"


default_backend = "git"

lgr = logging.getLogger("datalad.metadata.filter")


def create_metadata_object(metadata_dict: dict) -> MetadataRecord:
    """Create a metadata type instance from a JSON representation """
    return MetadataRecord.from_json(metadata_dict)


def create_iterator(dataset: Union[str, Path],
                    metadata_url: MetadataURL,
                    recursive: bool) -> Iterable:

    metadata_store_path, tree_version_list, uuid_set = get_metadata_objects(
        dataset=dataset,
        backend=default_backend
    )

    if isinstance(metadata_url, UUIDMetadataURL):
        for metadata_info in dump_from_uuid_set(mapper=default_backend,
                                                metadata_store=metadata_store_path,
                                                uuid_set=uuid_set,
                                                path=metadata_url,
                                                recursive=recursive):
            yield create_metadata_object(metadata_info["metadata"])

    elif isinstance(metadata_url, TreeMetadataURL):
        for metadata_info in dump_from_dataset_tree(mapper=default_backend,
                                                    metadata_store=metadata_store_path,
                                                    tree_version_list=tree_version_list,
                                                    metadata_url=metadata_url,
                                                    recursive=recursive):
            yield create_metadata_object(metadata_info["metadata"])

    else:
        raise ValueError(
            f"unsupported metadata url type: {type(metadata_url).__name__}")


@build_doc
class Filter(Interface):
    """Run a metadata filter on a set of metadata elements.

    Take a number of metadata elements and run a filter on it.

    The result of the filter operation will be written to stdout and can, for
    example, be passed to meta-add.

    The filter can be configured by passing key-value pairs given as additional
    arguments. Each key-value pair consists of two arguments, first the key,
    then the value. The key value pairs have to be separated by '++' from the
    metadata coordinates
    """

    result_renderer = "tailored"

    _examples_ = [
        dict(
            text="""Use the provided 'metalad_demofilter' to build a
            'histogram' of keys and their content in the metadata of the
            dataset 'root-dataset', iterating over the sub-datasets 'sub-a' and 
            'sub-b'.""",
            code_cmd="""datalad meta-filter metalad_demofilter -d root-dataset
            sub-a sub-b"""
        ),
        dict(
            text="""Apply 'metalad_demofilter' to all directories/sub-datasets
            of the dataset in the current working directory that start with
            'subject'.""",
            code_cmd="""datalad meta-filter metalad_demofilter subject*"""
        ),
    ]

    _params_ = dict(
        dataset=Parameter(
            args=("-d", "--dataset"),
            doc="""Git repository that contains datalad metadata. If no
                   repository path is given, the metadata store is determined
                   by the current work directory. All metadata URLs (see below)
                   are relative to this dataset.""",
            constraints=EnsureDataset() | EnsureNone()),
        filtername=Parameter(
            args=("filtername",),
            metavar="FILTER_NAME",
            doc="Name of the filter that should be executed.",
            constraints=EnsureStr()),
        metadataurls=Parameter(
            args=("metadataurls",),
            metavar="METADATA_URL",
            nargs="+",
            doc="""MetadataRecord URL(s). A list of at least one metadata URL.
                   The filter will receive a list of iterables, that contains
                   one iterable for each metadata URL. The iterables will yields
                   all metadata-entries that match the respective metadata URL.
                   """,
            constraints=EnsureStr()),
        # TODO: this parameter is specified here in order to print out a
        #  proper help message. It will never be filled by the argument parser
        #  because "metadataurls" has an arbitrary number of arguments, that
        #  means: "metadataurls" will eat up all "filterargs".
        filterargs=Parameter(
            args=("filterargs",),
            metavar="FILTER_ARGUMENTS",
            doc="""Extractor arguments given as string arguments to the
                   extractor. Filter arguments have to be separated from the
                   list of metadata coordinates by '++'.""",
            nargs="*",
            constraints=EnsureStr() | EnsureNone()),
        recursive=Parameter(
            args=("-r", "--recursive",),
            action="store_true",
            doc="""If set, the metadata URL iterables will yield all metadata
                   recursively from the matching metadata URLs."""))

    @staticmethod
    @datasetmethod(name="meta_filter")
    @eval_results
    def __call__(
            filtername: str,
            metadataurls: List[Union[str, MetadataURL]],
            dataset: Optional[Union[Dataset, str]] = ".",
            filterargs: Optional[List[str]] = None,
            recursive: bool = False) -> Iterable:

        # Get basic arguments
        filter_name = filtername
        if '++' in metadataurls:
            plusplus_index = metadataurls.index('++')
            metadata_urls, filter_args = (
                metadataurls[:plusplus_index],
                metadataurls[plusplus_index + 1:]
            )
        else:
            metadata_urls, filter_args = (
                metadataurls,
                filterargs or []
            )

        if not metadata_urls:
            raise ValueError("At least one metadata URL is required")

        metadata_urls = [
            url if isinstance(url, MetadataURL) else parse_metadata_url(url)
            for url in metadata_urls
        ]

        metadata_iterables = [
            create_iterator(dataset, metadata_url, recursive)
            for metadata_url in metadata_urls
        ]

        path = (
            dataset.pathobj
            if isinstance(dataset, Dataset)
            else Path(dataset)
        )
        if not path.is_absolute():
            path = Path.cwd() / path

        for metadata_record in run_filter(filter_name=filter_name,
                                          filter_args=filter_args,
                                          metadata_iterables=metadata_iterables):

            yield MetadataResult(
                status=OK,
                path=path,
                action=META_FILTER,
                metadata_type=metadata_record.type,
                metadata_record=metadata_record,
                metadata_source=path,
                backend="git").as_json_obj()

    @staticmethod
    def custom_result_renderer(res, **kwargs):
        if res["status"] != "ok" or res.get("action", "") != 'meta_filter':
            # logging complained about this already
            return

        metadata_record = res.get("metadata_record", None)
        if metadata_record is not None:
            path = (
                {"path": str(metadata_record["path"])}
                if "path" in metadata_record
                else {}
            )

            dataset_path = (
                {"dataset_path": str(metadata_record["dataset_path"])}
                if "dataset_path" in metadata_record
                else {}
            )

            ui.message(json.dumps({
                **metadata_record,
                **path,
                **dataset_path,
                "dataset_id": str(metadata_record["dataset_id"])
            }))

        context = res.get("context")
        if context is not None:
            ui.message(json.dumps(context))


def run_filter(filter_name: str,
               filter_args: Optional[List],
               metadata_iterables: List[Iterable]
               ) -> Iterable[MetadataRecord]:

    filter_class = get_filter_class(filter_name)
    filter_instance = filter_class(filter_name)
    args, kwargs = split_arguments(filter_args, filter_class, filter_instance)
    yield from filter_instance.filter(
        metadata_iterables,
        *(args or []),
        **(kwargs or {}))


def get_filter_class(filter_name: str) -> Type[MetadataFilterBase]:
    """ Get a filter class from its name"""
    from pkg_resources import iter_entry_points

    entry_points = list(
        iter_entry_points("datalad.metadata.filters", filter_name))

    if not entry_points:
        raise ValueError(
            "Requested metadata filter '{}' not available".format(
                filter_name))

    entry_point, ignored_entry_points = entry_points[-1], entry_points[:-1]
    lgr.debug(
        "Using metadata filter %s from distribution %s",
        filter_name,
        entry_point.dist.project_name)

    # Inform about overridden entry points
    for ignored_entry_point in ignored_entry_points:
        lgr.warning(
            "MetadataRecord filter %s from distribution %s overrides "
            "metadata filter from distribution %s",
            filter_name,
            entry_point.dist.project_name,
            ignored_entry_point.dist.project_name)

    return entry_point.load()


def split_arguments(args: List[str],
                    filter_class: Type,
                    filter_instance: object
                    ) -> Tuple[List, Dict]:
    """
    Split arguments into positional arguments and keyword arguments.
    TODO: Splitting is currently based on the presence of "=" in the argument.
          It should instead be based on a specification of arguments in the
          class or in the instance.

    :param args: a list of arguments
    :param filter_class:
           the class of the filter that should receive the arguments
    :param filter_instance:
           the instance of the filter that should receive the arguments
    :return:
           a tuple consisting of a list of positional arguments and a dictionary
           of keyword arguments
    """
    filter_args = list(filter(lambda argument: "=" not in argument, args))
    filter_kwargs = {
        argument.split("=", maxsplit=1)[0]: argument.split("=", maxsplit=1)[1:]
        for argument in args
        if "=" in argument}

    return filter_args, filter_kwargs