Source code for datalad_metalad.extract

# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Run a dataset-level metadata extractor on a dataset
or run a file-level metadata extractor on a file
"""
import json
import logging
import time
from os import curdir
from pathlib import (
    Path,
    PurePath,
)
from typing import (
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)
from uuid import UUID

from dataclasses import dataclass

from datalad.distribution.dataset import Dataset
from datalad.distribution.dataset import (
    datasetmethod,
    EnsureDataset,
)
from datalad.interface.base import (
    Interface,
    build_doc,
    eval_results,
)
from datalad.support.annexrepo import AnnexRepo
from datalad.ui import ui

from .extractors.base import (
    DataOutputCategory,
    DatasetMetadataExtractor,
    FileInfo,
    FileMetadataExtractor,
    MetadataExtractor,
    MetadataExtractorBase,
)

from datalad_deprecated.metadata.extractors.base import BaseMetadataExtractor

from datalad.support.constraints import (
    EnsureNone,
    EnsureStr,
)
from datalad.support.param import Parameter

from dataladmetadatamodel.metadatapath import MetadataPath

from .exceptions import ExtractorNotFoundError
from .utils import (
    args_to_dict,
    check_dataset,
)


__docformat__ = "restructuredtext"

default_mapper_family = "git"

lgr = logging.getLogger("datalad.metadata.extract")


@dataclass
class ExtractionArguments:
    source_dataset: Dataset
    source_dataset_id: UUID
    source_dataset_version: str
    local_source_object_path: Path
    extractor_class: Union[type(MetadataExtractor), type(FileMetadataExtractor)]
    extractor_type: str
    extractor_name: str
    extraction_parameter: Dict[str, str]
    file_tree_path: Optional[MetadataPath]
    agent_name: str
    agent_email: str


@build_doc
class Extract(Interface):
    """Run a metadata extractor on a dataset or file.

    This command distinguishes between dataset-level extraction and
    file-level extraction.

    If no "path" argument is given, the command
    assumes that a given extractor is a dataset-level extractor and
    executes it on the dataset that is given by the current working
    directory or by the "-d" argument.

    If a path is given, the command assumes that the path identifies a
    file and that the given extractor is a file-level extractor, which
    will then be executed on the specified file. If the file level
    extractor requests the content of a file that is not present, the
    command might "get" the file content to make it locally available.
    Path must not refer to a sub-dataset. Path must not be a directory.

    .. note::

        If you want to insert sub-dataset-metadata into the super-dataset's
        metadata, you currently have to do the following:
        first, extract dataset metadata of the sub-dataset using a dataset-
        level extractor, second add the extracted metadata with sub-dataset
        information (i.e. dataset_path, root_dataset_id, root-dataset-
        version) to the metadata of the super-dataset.

    The extractor configuration can be parameterized with key-value pairs
    given as additional arguments. Each key-value pair consists of two
    arguments, first the key, followed by the value. If dataset level extraction
    should be performed and you want to provide extractor arguments, you have to
    specify '--force_dataset_level' to ensure dataset-level extraction. i.e. to
    prevent interpretation of the key of the first extractor argument as path
    for a file-level extraction.

    The command can also take legacy datalad-metalad extractors and
    will execute them in either "content" or "dataset" mode, depending
    on the whether file-level- or dataset-level extraction is requested.
    """

    result_renderer = "tailored"

    _examples_ = [
        dict(
            text='Use the metalad_example_file-extractor to extract metadata'
                 'from the file "subdir/data_file_1.txt". The dataset is '
                 'given by the current working directory',
            code_cmd="datalad meta-extract metalad_example_file "
                     "subdir/data_file_1.txt"
        ),
        dict(
            text='Use the metalad_example_file-extractor to extract metadata '
                 'from the file "subdir/data_file_1.txt" in the dataset '
                 '/home/datasets/ds0001',
            code_cmd="datalad meta-extract -d /home/datasets/ds0001 "
                     "metalad_example_file subdir/data_file_1.txt"
        ),
        dict(
            text='Use the metalad_example_dataset-extractor to extract '
                 'dataset-level metadata from the dataset given by the '
                 'current working directory',
            code_cmd="datalad meta-extract metalad_example_dataset"
        ),
        dict(
            text='Use the metalad_example_dataset-extractor to extract '
                 'dataset-level metadata from the dataset in '
                 '/home/datasets/ds0001',
            code_cmd="datalad meta-extract -d /home/datasets/ds0001 "
                     "metalad_example_dataset"
        )]

    _params_ = dict(
        extractorname=Parameter(
            args=("extractorname",),
            metavar="EXTRACTOR_NAME",
            doc="Name of a metadata extractor to be executed."),
        path=Parameter(
            args=("path",),
            metavar="FILE",
            nargs="?",
            doc="""Path of a file or dataset to extract metadata
            from. The path should be relative to the root of the dataset.
            If this argument is provided, we assume a file
            extractor is requested, if the path is not given, or
            if it identifies the root of a dataset, i.e. "", we
            assume a dataset level metadata extractor is
            specified.
            You might provide an absolute file path, but it has to contain
            the dataset path as prefix.""",
            constraints=EnsureStr() | EnsureNone()),
        dataset=Parameter(
            args=("-d", "--dataset"),
            doc="""Dataset to extract metadata from. If no dataset
            is given, the dataset is determined by the current work
            directory.""",
            constraints=EnsureDataset() | EnsureNone()),
        context=Parameter(
            args=("-c", "--context"),
            doc="""Context, a JSON-serialized dictionary that provides
            constant data which has been gathered before, so meta-extract
            will not have re-gather this data. Keys and values are strings.
            meta-extract will look for the following key: 'dataset_version'.""",
            constraints=EnsureDataset() | EnsureNone()),
        get_context=Parameter(
            args=("--get-context",),
            action="store_true",
            doc="""Show the context that meta-extract determines with the
            given parameters and exit. The context can be used in subsequent
            calls to meta-extract with identical parameter, except from
            --get-context, to speed up the execution of meta-extract."""),
        force_dataset_level=Parameter(
            args=("--force-dataset-level",),
            action="store_true"),
        extractorargs=Parameter(
            args=("extractorargs",),
            metavar="EXTRACTOR_ARGUMENTS",
            doc="""Extractor arguments given as string arguments to the
            extractor. The extractor arguments are interpreted as key-value
            pairs. The first argument is the name of the key, the next argument
            is the value for that key, and so on. Consequently, there should be
            an even number of extractor arguments.
            
            If dataset level extraction should be performed and you want to
            provide extractor arguments. you have tp specify
            '--force-dataset-level' to ensure dataset-level extraction. i.e. to
            prevent interpretation of the key of the first extractor argument
            as path for a file-level extraction.""",
            nargs="*",
            constraints=EnsureStr() | EnsureNone()))

    @staticmethod
    @datasetmethod(name="meta_extract")
    @eval_results
    def __call__(
            extractorname: str,
            path: Optional[str] = None,
            dataset: Optional[Union[Dataset, str]] = None,
            context: Optional[Union[str, Dict[str, str]]] = None,
            get_context: bool = False,
            force_dataset_level: bool = False,
            extractorargs: Optional[List[str]] = None):
        
        # Get basic arguments
        extractor_name = extractorname
        extractor_args = ([path] + extractorargs
                          if force_dataset_level
                          else extractorargs)
        path = None if force_dataset_level else path
        context = (
            {}
            if context is None
            else (
                json.loads(context)
                if isinstance(context, str)
                else context))

        source_dataset = check_dataset(dataset or curdir, "extract metadata")
        source_dataset_version = context.get("dataset_version", None)
        if source_dataset_version is None:
            source_dataset_version = source_dataset.repo.get_hexsha()

        if get_context is True:
            yield dict(
                status="ok",
                action="meta_extract",
                path=source_dataset.path,
                logger=lgr,
                context=dict(
                    dataset_version=source_dataset_version
                )
            )
            return

        # Create a relative Path-instance, if path to a file is given. We have
        # to be careful not to resolve the path, because that could resolve the
        # git-annex link.
        path_object = None
        if path is not None:
            path_object = Path(path)
            if path_object.is_absolute():
                relative_path = None
                for dataset_path in (source_dataset.pathobj,
                                     source_dataset.pathobj.resolve()):
                    try:
                        relative_path = path_object.relative_to(dataset_path)
                        break
                    except ValueError:
                        pass
                if relative_path is None:
                    raise ValueError(
                        f"The provided path {path} is not contained in the "
                        f"dataset given by {source_dataset.pathobj}"
                    )
                path_object = relative_path

        _, file_tree_path = get_path_info(source_dataset, path_object, None)

        extractor_class = get_extractor_class(extractor_name)

        extraction_arguments = ExtractionArguments(
            source_dataset=source_dataset,
            source_dataset_id=UUID(source_dataset.id),
            source_dataset_version=source_dataset_version,
            local_source_object_path=(
                    source_dataset.pathobj / file_tree_path).absolute(),
            extractor_class=extractor_class,
            extractor_type=None,
            extractor_name=extractor_name,
            extraction_parameter=args_to_dict(extractor_args),
            file_tree_path=file_tree_path,
            agent_name=source_dataset.config.get("user.name"),
            agent_email=source_dataset.config.get("user.email"))

        # If a path is given, we assume file-level metadata extraction is
        # requested, and the extractor class should be a subclass of
        # FileMetadataExtractor (or a legacy extractor).
        # If path is not given, we assume that a dataset-level extraction is
        # requested and the extractor class is a subclass of
        # DatasetMetadataExtractor (or a legacy extractor class).
        if path:
            extraction_arguments.extractor_type = 'file'
            # Check whether the path points to a sub_dataset.
            ensure_path_validity(source_dataset, file_tree_path)
        else:
            extraction_arguments.extractor_type = 'dataset'
        
        yield from do_extraction(ep=extraction_arguments)
        return

    @staticmethod
    def custom_result_renderer(res, **kwargs):
        if res["status"] != "ok" or res.get("action", "") != 'meta_extract':
            # logging complained about this already
            return

        metadata_record = res.get("metadata_record", None)
        if metadata_record is not None:
            path = (
                {"path": str(metadata_record["path"])}
                if "path" in metadata_record
                else {}
            )

            dataset_path = (
                {"dataset_path": str(metadata_record["dataset_path"])}
                if "dataset_path" in metadata_record
                else {}
            )

            ui.message(json.dumps({
                **metadata_record,
                **path,
                **dataset_path,
                "dataset_id": str(metadata_record["dataset_id"])
            }))

        context = res.get("context")
        if context is not None:
            ui.message(json.dumps(context))


def do_extraction(ep: ExtractionArguments):
    extractor_type = ep.extractor_type    

    # Legacy extraction
    legacy_extractor_map = {
        'file': legacy_extract_file,
        'dataset': legacy_extract_dataset,
    }
    if not issubclass(ep.extractor_class, MetadataExtractorBase):
        lgr.debug(
            "performing legacy %s-level metadata "
            "extraction (%s) for %s at %s",
            extractor_type,
            ep.extractor_name,
            extractor_type,
            ep.source_dataset.path / ep.file_tree_path
            if extractor_type == 'file' else ep.source_dataset.path)

        yield from legacy_extractor_map[extractor_type](ep)
        return
    
    # Latest generation extraction
    extractor_class_map = {
        'file': FileMetadataExtractor,
        'dataset': DatasetMetadataExtractor,
    }
    if not issubclass(ep.extractor_class, extractor_class_map[extractor_type]):
        msg = (
            f"A {extractor_type}-level metadata-extraction was attempted",
            "since no path argument was given" if extractor_type == 'dataset' else "",
            f"but the specified extractor ({ep.extractor_name})",
            f"is not a {extractor_type}-level extractor"
        )
        raise ValueError(msg)
    
    lgr.debug(
            "performing %s-level metadata "
            "extraction (%s) for %s at %s",
            extractor_type,
            ep.extractor_name,
            extractor_type,
            ep.source_dataset.path / ep.file_tree_path \
            if extractor_type == 'file' else ep.source_dataset.path)
    
    if extractor_type == 'file':
        file_info = get_file_info(ep.source_dataset, ep.file_tree_path)
        extractor = ep.extractor_class(
            ep.source_dataset,
            ep.source_dataset_version,
            file_info,
            ep.extraction_parameter)
        ensure_content_availability(extractor, file_info)
    else:
        extractor = ep.extractor_class(
        ep.source_dataset,
        ep.source_dataset_version,
        ep.extraction_parameter)

    yield from perform_metadata_extraction(ep, extractor)


def perform_metadata_extraction(
        ep: ExtractionArguments,
        extractor: Union[DatasetMetadataExtractor, FileMetadataExtractor]
    ):
    
    # Get output category; only IMMEDIATE is supported
    output_category = extractor.get_data_output_category()
    if output_category != DataOutputCategory.IMMEDIATE:
        raise NotImplementedError(
            f"Output category {output_category} not supported")
    
    # Prepare result record
    result_template = {
        "action": "meta_extract",
        "path": ep.local_source_object_path
    }
    
    # Get required content
    res = extractor.get_required_content()
    if isinstance(res, bool):
        if res is False:
            yield {
                "status": "impossible",
                **result_template
            }
            return
    else:
        failure_count = 0
        for r in res:
            if r["status"] in ("error", "impossible"):
                failure_count += 1
                yield r
        if failure_count > 0:
            return
    
    # Run extraction and update result
    result = extractor.extract(None)
    result.datalad_result_dict.update(result_template)
    if result.extraction_success:
        result.datalad_result_dict["metadata_record"] = dict(
            type="dataset",
            dataset_id=ep.source_dataset_id,
            dataset_version=ep.source_dataset_version,
            extractor_name=ep.extractor_name,
            extractor_version=extractor.get_version(),
            extraction_parameter=ep.extraction_parameter,
            extraction_time=time.time(),
            agent_name=ep.agent_name,
            agent_email=ep.agent_email,
            extracted_metadata=result.immediate_data)
        if issubclass(ep.extractor_class, FileMetadataExtractor):
            result.datalad_result_dict["metadata_record"].update(
                dict(
                    type="file",
                    path=ep.file_tree_path,
                )
            )
    
    yield result.datalad_result_dict


def get_extractor_class(extractor_name: str) -> Union[
                                            Type[DatasetMetadataExtractor],
                                            Type[FileMetadataExtractor]]:

    """ Get an extractor from its name """
    from pkg_resources import iter_entry_points

    # The extractor class names of the old datalad-contained extractors have
    # been changed, when the extractors were moved to datalad_metalad.
    # Therefore, we have to use to extractors in
    # `datalad_metalad.extractors.legacy` instead of any old extractor code
    # from datalad core.
    entry_points = [
        entry_point
        for entry_point in iter_entry_points(
            "datalad.metadata.extractors",
            extractor_name
        )
        if entry_point.dist.project_name != "datalad"
    ]

    if not entry_points:
        entry_points = [
            entry_point
            for entry_point in iter_entry_points(
                "datalad.metadata.extractors",
                extractor_name
            )
            if entry_point.dist.project_name == "datalad"
        ]

    if not entry_points:
        raise ExtractorNotFoundError(
            "Requested metadata extractor '{}' not available".format(
                extractor_name))

    entry_point, ignored_entry_points = entry_points[-1], entry_points[:-1]
    lgr.debug(
        "Using metadata extractor %s from distribution %s",
        extractor_name,
        entry_point.dist.project_name)

    # Inform about overridden entry points
    for ignored_entry_point in ignored_entry_points:
        lgr.warning(
            "MetadataRecord extractor %s from distribution %s overrides "
            "metadata extractor from distribution %s",
            extractor_name,
            entry_point.dist.project_name,
            ignored_entry_point.dist.project_name)

    return entry_point.load()


def get_file_info(dataset: Dataset,
                  file_path: MetadataPath) -> FileInfo:
    """
    Get information about the file in the dataset or
    None, if the file is not part of the dataset.
    """

    # Convert the metadata file-path into a system file path
    path = Path(file_path)
    try:
        relative_path = path.relative_to(dataset.pathobj)
    except ValueError:
        relative_path = path

    path = dataset.pathobj / relative_path

    path_status = (
            list(dataset.status(path, result_renderer="disabled")) or [None])[0]

    if path_status is None:
        raise FileNotFoundError(
            "no dataset status for dataset: {} file: {}".format(
                dataset.path, path))

    if path_status["state"] == "untracked":
        raise ValueError("file not tracked: {}".format(path))

    path_relative_to_dataset = PurePath(
        path_status["path"]).relative_to(dataset.pathobj)

    # noinspection PyUnresolvedReferences
    return FileInfo(
        type="file",     # TODO: what about the situation where path_status["type"] == "symlink"?
        git_sha_sum=path_status["gitshasum"],
        byte_size=path_status.get("bytesize", 0),
        state=path_status["state"],
        path=path_status["path"],   # Absolute path, used by extractors
        intra_dataset_path=str(
            MetadataPath(*path_relative_to_dataset.parts)))


def get_path_info(dataset: Dataset,
                  element_path: Optional[Path],
                  into_dataset_path: Optional[Path] = None
                  ) -> Tuple[MetadataPath, MetadataPath]:
    """
    Determine the dataset tree path and the file tree path.

    If the path is absolute, we can determine the containing dataset
    and the metadatasets around it. If the path is not an element of
    a locally known dataset, we signal an error.

    If the path is relative, we convert it to an absolute path
    by appending it to the dataset or current directory and perform
    the above check.
    """
    full_dataset_path = Path(dataset.path).resolve()
    if into_dataset_path is None:
        dataset_tree_path = MetadataPath("")
    else:
        full_into_dataset_path = into_dataset_path.resolve()
        dataset_tree_path = MetadataPath(
            full_dataset_path.relative_to(full_into_dataset_path))

    if element_path is None:
        return dataset_tree_path, MetadataPath("")

    if element_path.is_absolute():
        full_file_path = element_path
    else:
        full_file_path = full_dataset_path / element_path

    file_tree_path = full_file_path.relative_to(full_dataset_path)

    return dataset_tree_path, MetadataPath(file_tree_path)


def ensure_path_validity(dataset: Dataset, file_tree_path: MetadataPath):
    # TODO: there is most likely a better way to do this in datalad,
    # but I want to ensure, that we do not enumerate all sub-datasets
    # in order to perform this check on a known path.

    full_path = dataset.pathobj / file_tree_path
    if full_path.is_dir():
        raise ValueError(f"FILE must not point to a directory ({full_path})")


def ensure_content_availability(extractor: FileMetadataExtractor,
                                file_info: FileInfo):

    if extractor.is_content_required():
        for result in extractor.dataset.get(path={file_info.path},
                                            get_data=True,
                                            return_type="generator",
                                            result_renderer="disabled"):
            if result.get("status", "") == "error":
                lgr.error(
                    "cannot make content of {} available in dataset {}".format(
                        file_info.path, extractor.dataset))
                return
        lgr.debug(
            "requested content {}:{} available".format(
                extractor.dataset.path, file_info.intra_dataset_path))


def ensure_legacy_path_availability(ep: ExtractionArguments, path: str):
    for result in ep.source_dataset.get(path=path,
                                        get_data=True,
                                        return_type="generator",
                                        result_renderer="disabled"):

        if result.get("status", "") == "error":
            lgr.error(
                "cannot make content of {} available "
                "in dataset {}".format(
                    path, ep.source_dataset))
            return

    lgr.debug(
        "requested content {}:{} available".format(
            ep.source_dataset.path, path))


def ensure_legacy_content_availability(ep: ExtractionArguments,
                                       extractor: MetadataExtractor,
                                       operation: str,
                                       status: List[dict]):

    try:
        for required_element in extractor.get_required_content(
                    ep.source_dataset,
                    operation,
                    status):

            ensure_legacy_path_availability(ep, required_element.path)
    except AttributeError:
        pass


def legacy_extract_dataset(ea: ExtractionArguments) -> Iterable[dict]:

    if issubclass(ea.extractor_class, MetadataExtractor):

        status = ea.source_dataset.subdatasets(result_renderer="disabled")
        extractor = ea.extractor_class()
        ensure_legacy_content_availability(ea, extractor, "dataset", status)

        for result in extractor(ea.source_dataset,
                                ea.source_dataset_version,
                                "dataset",
                                status):

            yield {
                "status": result["status"],
                "action": "meta_extract",
                "path": ea.source_dataset.path,
                "type": "dataset",
                ** {
                    "message": result["message"]
                    for _ in [1]
                    if "message" in result
                },
                ** (
                    {
                        "metadata_record": dict(
                            type="dataset",
                            dataset_id=ea.source_dataset_id,
                            dataset_version=ea.source_dataset_version,
                            extractor_name=ea.extractor_name,
                            extractor_version=str(
                                extractor.get_state(ea.source_dataset).get(
                                    "version", "---")),
                            extraction_parameter=ea.extraction_parameter,
                            extraction_time=time.time(),
                            agent_name=ea.agent_name,
                            agent_email=ea.agent_email,
                            extracted_metadata=result["metadata"])
                    }
                    if result["status"] == "ok"
                    else {})
            }

    elif issubclass(ea.extractor_class, BaseMetadataExtractor):

        # Datalad legacy extractor
        ds_path = str(ea.source_dataset.pathobj)
        if ea.extractor_class.NEEDS_CONTENT:
            ensure_legacy_path_availability(ea, ds_path)

        extractor = ea.extractor_class(ea.source_dataset, [ds_path])
        dataset_result, _ = extractor.get_metadata(True, False)

        yield dict(
            action="meta_extract",
            status="ok",
            type="dataset",
            metadata_record=dict(
                type="dataset",
                dataset_id=ea.source_dataset_id,
                dataset_version=ea.source_dataset_version,
                extractor_name=ea.extractor_name,
                extractor_version="un-versioned",
                extraction_parameter=ea.extraction_parameter,
                extraction_time=time.time(),
                agent_name=ea.agent_name,
                agent_email=ea.agent_email,
                extracted_metadata=dataset_result))

    else:
        raise ValueError(
            f"unknown extractor class: {type(ea.extractor_class).__name__}")


def annex_status(annex_repo, paths=None):
    info = annex_repo.get_content_annexinfo(
        paths=paths,
        eval_availability=False,
        init=annex_repo.get_content_annexinfo(
            paths=paths,
            ref="HEAD",
            eval_availability=False,
            init=annex_repo.status(
                paths=paths,
                untracked="no",
                eval_submodule_state="full")
        )
    )
    annex_repo._mark_content_availability(info)
    return info


def legacy_get_file_info(dataset: Dataset,
                         path: Path
                         ) -> Dict:

    status = None
    if isinstance(dataset.repo, AnnexRepo):
        if dataset.pathobj != dataset.repo.pathobj:
            # The dataset path might include a symlink, this requires us to
            # convert the path to be based on dataset.repo.pathobj
            path = dataset.repo.pathobj.resolve() / path.relative_to(dataset.pathobj)
        status = annex_status(dataset.repo, [path])
        if status and status[path].get("status") == "error":
            raise ValueError(
                f"error getting status for file: {path}: "
                f"{status.get('error_message', '')}")
    if not status:
        status = dataset.repo.status([path], untracked="no")
    if not status or path not in status:
        raise ValueError(f"untracked file: {path}")
    return {
        "path": str(path),
        **(status[path])
    }


def legacy_extract_file(ea: ExtractionArguments) -> Iterable[dict]:

    if issubclass(ea.extractor_class, MetadataExtractor):

        # Call metalad legacy extractor with a single status record.
        file_path = ea.source_dataset.pathobj / ea.file_tree_path
        # Determine the file type:
        extractor = ea.extractor_class()
        status = legacy_get_file_info(ea.source_dataset, file_path)
        ensure_legacy_content_availability(ea, extractor, "content", [status])

        for result in extractor(ea.source_dataset,
                                ea.source_dataset_version,
                                "content",
                                [status]):

            if result["status"] == "ok":
                yield dict(
                    action="meta_extract",
                    status="ok",
                    type="file",
                    path=str(file_path.absolute()),
                    metadata_record=dict(
                        type="file",
                        dataset_id=ea.source_dataset_id,
                        dataset_version=ea.source_dataset_version,
                        path=ea.file_tree_path,
                        extractor_name=ea.extractor_name,
                        extractor_version=str(
                            extractor.get_state(ea.source_dataset).get(
                                "version", "---")),
                        extraction_parameter=ea.extraction_parameter,
                        extraction_time=time.time(),
                        agent_name=ea.agent_name,
                        agent_email=ea.agent_email,
                        extracted_metadata=result["metadata"]))
            else:
                yield dict(
                    action="meta_extract",
                    status=result["status"],
                    type="file",
                    path=str(file_path.absolute()),
                    message=result["message"])

    elif issubclass(ea.extractor_class, BaseMetadataExtractor):

        # Datalad legacy extractor
        path = str(ea.source_dataset.pathobj / ea.file_tree_path)
        if ea.extractor_class.NEEDS_CONTENT:
            ensure_legacy_path_availability(ea, path)

        extractor = ea.extractor_class(ea.source_dataset, [str(ea.file_tree_path)])
        _, file_result = extractor.get_metadata(False, True)

        for extracted_path, metadata in file_result:
            yield dict(
                action="meta_extract",
                status="ok",
                type="file",
                path=path,
                metadata_record=dict(
                    type="file",
                    dataset_id=ea.source_dataset_id,
                    dataset_version=ea.source_dataset_version,
                    path=MetadataPath(extracted_path),
                    extractor_name=ea.extractor_name,
                    extractor_version="un-versioned",
                    extraction_parameter=ea.extraction_parameter,
                    extraction_time=time.time(),
                    agent_name=ea.agent_name,
                    agent_email=ea.agent_email,
                    extracted_metadata=metadata
                ))

    else:
        raise ValueError(
            f"unknown extractor class: {type(ea.extractor_class).__name__}")