# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Dump metadata of a dataset
"""
__docformat__ = 'restructuredtext'
import json
import logging
from pathlib import Path
from typing import (
cast,
Any,
Generator,
Union,
)
from uuid import UUID
from datalad.distribution.dataset import datasetmethod
from datalad.interface.base import (
Interface,
build_doc,
eval_results,
)
from datalad.support.constraints import (
EnsureNone,
EnsureStr,
)
from datalad.support.param import Parameter
from datalad.ui import ui
from dataladmetadatamodel.datasettree import datalad_root_record_name
from dataladmetadatamodel.mapper.reference import Reference
from dataladmetadatamodel.mappableobject import ensure_mapped
from dataladmetadatamodel.metadata import (
Metadata,
MetadataInstance,
)
from dataladmetadatamodel.metadatapath import MetadataPath
from dataladmetadatamodel.metadatarootrecord import MetadataRootRecord
from dataladmetadatamodel.mtreenode import MTreeNode
from dataladmetadatamodel.uuidset import UUIDSet
from dataladmetadatamodel.versionlist import TreeVersionList
from .pathutils.metadataurlparser import (
MetadataURLParser,
TreeMetadataURL,
UUIDMetadataURL,
)
from .metadatautils import get_metadata_objects
from .metadatatypes import JSONType
from .pathutils.mtreesearch import MTreeSearch
default_mapper_family = "git"
lgr = logging.getLogger('datalad.metadata.dump')
def _dataset_report_matcher(node: Any) -> bool:
return isinstance(node, MetadataRootRecord)
def _file_report_matcher(node: Any) -> bool:
return isinstance(node, Metadata)
def _create_result_record(mapper: str,
metadata_store: Union[Path, str],
metadata_record: JSONType,
element_path: MetadataPath,
report_type: str):
# Display remote metadata stores properly
if isinstance(metadata_store, str):
if Reference.is_remote(metadata_store):
path = metadata_store + ":/" + str(element_path)
else:
path = (Path(metadata_store) / element_path).absolute()
else:
path = (metadata_store / element_path).absolute()
return {
"status": "ok",
"action": "meta_dump",
"backend": mapper,
"metadata_source": metadata_store,
"type": report_type,
"metadata": metadata_record,
"path": path,
}
def _get_common_properties(root_dataset_identifier: UUID,
root_dataset_version: str,
prefix_path: MetadataPath,
metadata_root_record: MetadataRootRecord,
dataset_path: MetadataPath) -> dict:
if prefix_path != MetadataPath(""):
root_info = {
"dataset_path": str(prefix_path)}
elif dataset_path != MetadataPath(""):
root_info = {
"root_dataset_id": str(root_dataset_identifier),
"root_dataset_version": root_dataset_version,
"dataset_path": str(dataset_path)}
else:
root_info = {}
return {
**root_info,
"dataset_id": str(metadata_root_record.dataset_identifier),
"dataset_version": metadata_root_record.dataset_version
}
def _get_instance_properties(extractor_name: str,
instance: MetadataInstance) -> dict:
return {
"extraction_time": instance.time_stamp,
"agent_name": instance.author_name,
"agent_email": instance.author_email,
"extractor_name": extractor_name,
"extractor_version": instance.configuration.version,
"extraction_parameter": instance.configuration.parameter,
"extracted_metadata": instance.metadata_content
}
def show_dataset_metadata(mapper: str,
metadata_store: Path,
root_dataset_identifier: UUID,
root_dataset_version: str,
prefix_path: MetadataPath,
dataset_path: MetadataPath,
metadata_root_record: MetadataRootRecord
) -> Generator[dict, None, None]:
if metadata_root_record is None:
return
with ensure_mapped(metadata_root_record):
dataset_level_metadata = metadata_root_record.dataset_level_metadata.read_in()
if dataset_level_metadata is None:
lgr.warning(
f"no dataset level metadata for dataset "
f"uuid:{root_dataset_identifier}@{root_dataset_version}")
return
common_properties = _get_common_properties(
root_dataset_identifier,
root_dataset_version,
prefix_path,
metadata_root_record,
dataset_path)
dataset_level_metadata = cast(Metadata, dataset_level_metadata)
for extractor_name, extractor_runs in dataset_level_metadata.extractor_runs:
for instance in extractor_runs:
instance_properties = _get_instance_properties(
extractor_name,
instance)
yield _create_result_record(
mapper=mapper,
metadata_store=metadata_store,
metadata_record={
"type": "dataset",
**common_properties,
**instance_properties
},
element_path=dataset_path,
report_type="dataset")
def show_file_tree_metadata(mapper: str,
metadata_store: Path,
root_dataset_identifier: UUID,
root_dataset_version: str,
prefix_path: MetadataPath,
dataset_path: MetadataPath,
metadata_root_record: MetadataRootRecord,
search_pattern: MetadataPath,
recursive: bool
) -> Generator[dict, None, None]:
if metadata_root_record is None:
return
with ensure_mapped(metadata_root_record):
dataset_level_metadata = metadata_root_record.dataset_level_metadata
file_tree = metadata_root_record.file_tree
with ensure_mapped(dataset_level_metadata):
with ensure_mapped(file_tree):
# Do not try to search anything if the file tree is empty
if not file_tree or not file_tree.mtree.child_nodes:
return
# Determine matching file paths
tree_search = MTreeSearch(file_tree.mtree)
result_count = 0
for path, metadata, _ in tree_search.search_pattern(pattern=search_pattern,
recursive=recursive):
result_count += 1
# Ignore empty datasets and ignore paths that do not
# describe metadata, but a directory.
if metadata is None or isinstance(metadata, MTreeNode):
continue
metadata = cast(Metadata, metadata)
common_properties = _get_common_properties(
root_dataset_identifier,
root_dataset_version,
prefix_path,
metadata_root_record,
dataset_path)
with ensure_mapped(metadata):
for extractor_name, extractor_runs in metadata.extractor_runs:
for instance in extractor_runs:
instance_properties = _get_instance_properties(
extractor_name,
instance)
yield _create_result_record(
mapper=mapper,
metadata_store=metadata_store,
metadata_record={
"type": "file",
"path": str(path),
**common_properties,
**instance_properties
},
element_path=dataset_path / path,
report_type="dataset")
if result_count == 0:
lgr.warning(
f"pattern '{str(search_pattern)}' does not match any element "
f"in file-tree of dataset {metadata_root_record.dataset_identifier}"
f"@{metadata_root_record.dataset_version} (stored on "
f"{mapper}:{metadata_store})")
def dump_from_dataset_tree(mapper: str,
metadata_store: Path,
tree_version_list: TreeVersionList,
metadata_url: TreeMetadataURL,
recursive: bool) -> Generator[dict, None, None]:
""" Dump dataset tree elements that are referenced in path """
# Normalize path representation
if not metadata_url or metadata_url.dataset_path is None:
metadata_url = TreeMetadataURL(MetadataPath(""), MetadataPath(""))
# Get specified version, if none is specified, take all versions.
requested_versions = ([metadata_url.version]
if metadata_url.version is not None
else list(tree_version_list.versions()))
for version in requested_versions:
try:
# Fetch dataset tree for the specified version
vpd_iterable = tree_version_list.get_dataset_trees(version)
except KeyError:
lgr.error(
f"could not locate metadata for version {version} in "
f"metadata_store {mapper}:{metadata_store}")
continue
for _, prefix_path, dataset_tree in vpd_iterable:
root_mrr = dataset_tree.get_metadata_root_record(MetadataPath(""))
if root_mrr is None:
lgr.debug(
f"no root dataset record found for version "
f"{version} in metadata store "
f"{metadata_store}, cannot determine root dataset id")
root_dataset_version = version
root_dataset_identifier = "<unknown>"
else:
with ensure_mapped(root_mrr):
root_dataset_version = root_mrr.dataset_version
root_dataset_identifier = root_mrr.dataset_identifier
# Create a tree search object to search for the specified datasets
tree_search = MTreeSearch(dataset_tree.mtree)
search_results = tree_search.search_pattern(
pattern=metadata_url.dataset_path,
recursive=recursive,
item_indicator=datalad_root_record_name)
result_count = 0
for path, node, _ in search_results:
result_count += 1
mrr = cast(
MetadataRootRecord,
node.get_child(datalad_root_record_name))
if mrr is None:
# The metadata root record might be None, if no dataset
# was registered in the dataset tree at this level.
continue
yield from show_dataset_metadata(
mapper,
metadata_store,
root_dataset_identifier,
root_dataset_version,
prefix_path,
path,
mrr)
yield from show_file_tree_metadata(
mapper,
metadata_store,
root_dataset_identifier,
root_dataset_version,
prefix_path,
path,
mrr,
metadata_url.local_path,
recursive)
if result_count == 0:
lgr.error(
f"search pattern '{str(metadata_url.dataset_path)}' does not "
f"match any dataset in dataset-tree of dataset "
f"{root_dataset_identifier}@{root_dataset_version} (stored on "
f"{mapper}:{metadata_store})")
def dump_from_uuid_set(mapper: str,
metadata_store: Path,
uuid_set: UUIDSet,
path: UUIDMetadataURL,
recursive: bool) -> Generator[dict, None, None]:
""" Dump UUID-identified dataset elements that are referenced in path """
try:
version_list = uuid_set.get_version_list(path.uuid)
except KeyError:
lgr.error(
f"could not locate metadata for dataset with UUID {path.uuid} in "
f"metadata_store {mapper}:{metadata_store}")
return
# Get specified version, if none is specified, take all versions.
requested_dataset_version = ([path.version]
if path.version is not None
else list(version_list.versions()))
for dataset_version, prefix_path in version_list.versions_and_prefix_paths():
if dataset_version not in requested_dataset_version:
continue
try:
_, dataset_path, metadata_root_record = \
version_list.get_versioned_element(dataset_version, prefix_path)
except KeyError:
lgr.error(
f"could not locate metadata for version {dataset_version} for "
f"dataset with UUID {path.uuid} in metadata_store "
f"{mapper}:{metadata_store}")
continue
metadata_root_record = cast(MetadataRootRecord, metadata_root_record)
# Show dataset-level metadata
yield from show_dataset_metadata(
mapper,
metadata_store,
path.uuid,
dataset_version,
prefix_path,
dataset_path,
metadata_root_record)
# Show file-level metadata
yield from show_file_tree_metadata(
mapper,
metadata_store,
path.uuid,
dataset_version,
prefix_path,
dataset_path,
metadata_root_record,
path.local_path,
recursive)
return
@build_doc
class Dump(Interface):
"""Dump a dataset's aggregated metadata for dataset and file metadata
Two types of metadata are supported:
1. metadata describing a dataset as a whole (dataset-global metadata), and
2. metadata for files in a dataset (content metadata).
The DATASET_FILE_PATH_PATTERN argument specifies dataset and file patterns
that are matched against the dataset and file information in the metadata.
There are two format, UUID-based and dataset-tree based. The formats are:
TREE: ["tree:"] [DATASET_PATH] ["@" VERSION-DIGITS] [":" [LOCAL_PATH]]
UUID: "uuid:" UUID-DIGITS ["@" VERSION-DIGITS] [":" [LOCAL_PATH]]
(The tree-format is the default format and does not require a prefix).
"""
# Use a custom renderer to emit a self-contained metadata record. The
# emitted record can be fed into meta-add for example.
result_renderer = 'tailored'
_examples_ = [
dict(
text='Dump the metadata of the file "dataset_description.json" in '
'the dataset "simon". (The queried dataset is determined '
'based on the current working directory)',
code_cmd="datalad meta-dump simon:dataset_description.json"),
dict(
text="Sometimes it is helpful to get metadata records formatted "
"in a more accessible form, here as pretty-printed JSON",
code_cmd="datalad -f json_pp meta-dump "
"simon:dataset_description.json"),
dict(
text="Same query as above, but specify that all datasets should "
"be queried for the given path",
code_cmd="datalad meta-dump :somedir/subdir/thisfile.dat"),
dict(
text="Dump any metadata record of any dataset known to the "
"queried dataset",
code_cmd="datalad meta-dump -r"),
dict(
text="Dump any metadata record of any dataset known to the "
"queried dataset and output pretty-printed JSON",
code_cmd="datalad -f json_pp meta-dump -r"),
dict(
text="Show metadata for all files ending in `.json` in the root "
"directories of all datasets",
code_cmd="datalad meta-dump *:*.json -r"),
dict(
text="Show metadata for all files ending in `.json` in all "
"datasets by not specifying a dataset at all. This will "
"start dumping at the top-level dataset.",
code_cmd="datalad meta-dump :*.json -r")
]
_params_ = dict(
dataset=Parameter(
args=("-d", "--dataset"),
metavar="DATASET",
doc="""Dataset for which metadata should be dumped. If no
directory name is provided, the current working directory is
used."""),
path=Parameter(
args=("path",),
metavar="DATASET_FILE_PATH_PATTERN",
doc="path to query metadata for",
constraints=EnsureStr() | EnsureNone(),
nargs="?"),
recursive=Parameter(
args=("-r", "--recursive",),
action="store_true",
doc="""If set, recursively report on any matching metadata based
on given paths or reference dataset. Note, setting this
option does not cause any recursion into potential
sub-datasets on the filesystem. It merely determines what
metadata is being reported from the given/discovered
reference dataset."""))
@staticmethod
@datasetmethod(name='meta_dump')
@eval_results
def __call__(
dataset=None,
path="",
recursive=False):
metadata_store_path, tree_version_list, uuid_set = get_metadata_objects(
dataset,
default_mapper_family)
parser = MetadataURLParser(path)
metadata_url = parser.parse()
if isinstance(metadata_url, TreeMetadataURL):
yield from dump_from_dataset_tree(
default_mapper_family,
metadata_store_path,
tree_version_list,
metadata_url,
recursive)
elif isinstance(metadata_url, UUIDMetadataURL):
yield from dump_from_uuid_set(
default_mapper_family,
metadata_store_path,
uuid_set,
metadata_url,
recursive)
return
@staticmethod
def custom_result_renderer(res, **_):
if res["status"] != "ok" or res.get("action", "") != 'meta_dump':
# logging complained about this already
return
ui.message(json.dumps(res["metadata"]))