Source code for datalad.distribution.drop

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""High-level interface for dropping dataset content

"""

__docformat__ = 'restructuredtext'

import logging

from os.path import (
    join as opj,
    isabs,
    normpath,
)
from datalad.utils import ensure_list
from datalad.support.param import Parameter
from datalad.support.constraints import EnsureStr, EnsureNone
from datalad.support.exceptions import (
    CommandError,
    InsufficientArgumentsError,
)
from datalad.distribution.dataset import (
    Dataset,
    EnsureDataset,
    datasetmethod,
    require_dataset,
)
from datalad.interface.base import (
    Interface,
    build_doc,
)
from datalad.interface.common_opts import (
    if_dirty_opt,
    recursion_flag,
    recursion_limit,
)
from datalad.interface.results import (
    get_status_dict,
    annexjson2result,
    success_status_map,
    results_from_annex_noinfo,
)
from datalad.interface.utils import (
    handle_dirty_dataset,
    eval_results,
)
from datalad.core.local.status import Status

lgr = logging.getLogger('datalad.distribution.drop')

dataset_argument = Parameter(
    args=("-d", "--dataset"),
    metavar="DATASET",
    doc="""specify the dataset to perform the operation on.
    If no dataset is given, an attempt is made to identify a dataset
    based on the `path` given""",
    constraints=EnsureDataset() | EnsureNone())


check_argument = Parameter(
    args=("--nocheck",),
    doc="""whether to perform checks to assure the configured minimum
    number (remote) source for data.[CMD:  Give this
    option to skip checks CMD]""",
    action="store_false",
    dest='check')


def _postproc_result(res, respath_by_status, ds, **kwargs):
    res = annexjson2result(
        # annex reports are always about files
        res, ds, type='file', **kwargs)
    success = success_status_map[res['status']]
    respath_by_status[success] = \
        respath_by_status.get(success, []) + [res['path']]
    if res["status"] == "error" and res["action"] == "drop":
        msg = res["message"]
        if isinstance(msg, str) and "Use --force to" in msg:
            # Avoid confusing datalad-drop callers with git-annex-drop's
            # suggestion to use --force.
            res["message"] = msg.replace("--force", "--nocheck")
    return res


def _drop_files(ds, paths, check, noannex_iserror=False, **kwargs):
    """Helper to drop content in datasets.

    Parameters
    ----------
    ds : Dataset
    paths : path or list(path)
      which content to drop
    check : bool
      whether to instruct annex to perform minimum copy availability
      checks
    noannex_iserror : bool
      whether calling this function on a pure Git repo results in an
      'impossible' or 'notneeded' result.
    **kwargs
      additional payload for the result dicts
    """
    # expensive, access only once
    ds_repo = ds.repo
    if 'action' not in kwargs:
        kwargs['action'] = 'drop'
    # always need to make sure that we pass a list
    # `normalize_paths` decorator will otherwise screw all logic below
    paths = ensure_list(paths)
    if not hasattr(ds_repo, 'drop'):
        for p in paths:
            r = get_status_dict(
                status='impossible' if noannex_iserror else 'notneeded',
                path=p if isabs(p) else normpath(opj(ds.path, p)),
                message="no annex'ed content",
                **kwargs)
            r['action'] = 'drop'
            yield r
        return

    cmd = ['drop']
    if not check:
        cmd.append('--force')

    respath_by_status = {}
    try:
        yield from (
            _postproc_result(res, respath_by_status, ds)
            for res in ds_repo._call_annex_records(cmd, files=paths)
        )
    except CommandError as e:
        # pick up the results captured so far and yield them
        # the error will be amongst them
        yield from (
            _postproc_result(res, respath_by_status, ds)
            for res in e.kwargs.get('stdout_json', [])
        )
    # report on things requested that annex was silent about
    for r in results_from_annex_noinfo(
            ds, paths, respath_by_status,
            dir_fail_msg='could not drop some content in %s %s',
            noinfo_dir_msg='nothing to drop from %s',
            noinfo_file_msg="no annex'ed content",
            **kwargs):
        r['action'] = 'drop'
        yield r


@build_doc
class Drop(Interface):
    """Drop file content from datasets

    This command takes any number of paths of files and/or directories. If
    a common (super)dataset is given explicitly, the given paths are
    interpreted relative to this dataset.

    Recursion into subdatasets needs to be explicitly enabled, while recursion
    into subdirectories within a dataset is done automatically. An optional
    recursion limit is applied relative to each given input path.

    By default, the availability of at least one remote copy is verified before
    file content is dropped. As these checks could lead to slow operation
    (network latencies, etc), they can be disabled.

    """
    _examples_ = [
        dict(text="Drop single file content",
             code_py="drop('path/to/file')",
             code_cmd="datalad drop <path/to/file>"),
        dict(text="Drop all file content in the current dataset",
             code_py="drop('.')",
             code_cmd="datalad drop"),
        dict(text="Drop all file content in a dataset and all its subdatasets",
             code_py="drop(dataset='.', recursive=True)",
             code_cmd="datalad drop -d <path/to/dataset> -r"),
        dict(text="Disable check to ensure the configured minimum number of "
                  "remote sources for dropped data",
             code_py="drop(path='path/to/content', check=False)",
             code_cmd="datalad drop <path/to/content> --nocheck"),
    ]

    _action = 'drop'

    _params_ = dict(
        dataset=dataset_argument,
        path=Parameter(
            args=("path",),
            metavar="PATH",
            doc="path/name of the component to be dropped",
            nargs="*",
            constraints=EnsureStr() | EnsureNone()),
        recursive=recursion_flag,
        recursion_limit=recursion_limit,
        check=check_argument,
        if_dirty=if_dirty_opt,
    )

    @staticmethod
    @datasetmethod(name=_action)
    @eval_results
    def __call__(
            path=None,
            dataset=None,
            recursive=False,
            recursion_limit=None,
            check=True,
            if_dirty='save-before'):

        if not dataset and not path:
            raise InsufficientArgumentsError(
                "insufficient information for `drop`: requires at least a path or dataset")
        refds_path = Interface.get_refds_path(dataset)
        res_kwargs = dict(action='drop', logger=lgr, refds=refds_path)
        # this try-except dance is only to maintain a previous behavior of `drop`
        # where it did not ValueError, but yielded error status
        try:
            ds = require_dataset(
                dataset, check_installed=True, purpose='drop content')
        except ValueError as e:
            yield dict(
                status='error',
                message=str(e),
                **res_kwargs,
            )
            return

        if dataset and not path:
            # act on the whole dataset if nothing else was specified
            path = refds_path
        content_by_ds = {}
        for st in Status.__call__(
                # do not use `ds` to preserve path semantics
                dataset=dataset,
                path=path,
                annex=None,
                untracked='no',
                recursive=recursive,
                recursion_limit=recursion_limit,
                eval_subdataset_state='no',
                report_filetype='raw',
                return_type='generator',
                result_renderer=None,
                # yield errors and let caller decide
                on_failure='ignore'):
            if st['status'] == 'error':
                # Downstream code can't do anything with these. Let the caller
                # decide their fate.
                yield st
                continue
            # ignore submodule entries
            if st.get('type') == 'dataset':
                if not Dataset(st['path']).is_installed():
                    continue
                parentds = st['path']
            else:
                parentds = st['parentds']
            cbd = content_by_ds.get(parentds, [])
            cbd.append(st['path'])
            content_by_ds[parentds] = cbd

        # iterate over all datasets, order doesn't matter
        for ds_path in content_by_ds:
            ds = Dataset(ds_path)
            # TODO generator
            # this should yield what it did
            handle_dirty_dataset(ds, mode=if_dirty)
            for r in _drop_files(
                    ds,
                    content_by_ds[ds_path],
                    check=check,
                    **res_kwargs):
                yield r
        # there is nothing to save at the end