Source code for datalad_tabby.io.load

"""Utilities for loading a `tabby` record from disk"""

from __future__ import annotations

import csv
import json
from pathlib import Path
from typing import (
    Dict,
    List,
)

from .load_utils import (
    _assign_context,
    _compact_obj,
    _build_import_trace,
    _get_index_after_last_nonempty,
    _get_tabby_prefix_from_sheet_fpath,
    _manyrow2obj,
    _sanitize_override_key,
)


[docs] def load_tabby( src: Path, *, single: bool = True, jsonld: bool = True, recursive: bool = True, cpaths: List | None = None, ) -> Dict | List: """Load a tabby (TSV) record as structured (JSON(-LD)) data The record is identified by the table/sheet file path ``src``. This need not be the root 'dataset' sheet, but can be any component of the full record. The ``single`` flag determines whether the record is interpreted as a single entity (i.e., JSON object), or many entities (i.e., JSON array of (homogeneous) objects). Depending on the ``single`` flag, either a ``dict`` or a ``list`` is returned. Other tabby tables/sheets are loaded when ``@tabby-single|many-`` import statements are discovered. The corresponding data structures then replace the import statement at its location. Setting the ``recursive`` flag to ``False`` disables table import, which will result in only the record available at the ``src`` path being loaded. With the ``jsonld`` flag, a declared or default JSON-LD context is loaded and inserted into the record. """ ldr = _TabbyLoader( jsonld=jsonld, recursive=recursive, cpaths=cpaths, ) return ldr(src=src, single=single)
class _TabbyLoader: def __init__( self, jsonld: bool = True, recursive: bool = True, cpaths: List[Path] | None = None, ): std_convention_path = Path(__file__).parent / 'conventions' if cpaths is None: cpaths = [std_convention_path] else: cpaths.append(std_convention_path) self._cpaths = cpaths self._jsonld = jsonld self._recursive = recursive def __call__(self, src: Path, *, single: bool = True): return (self._load_single if single else self._load_many)( src=src, trace=[], ) def _load_single( self, *, src: Path, trace: List, ) -> Dict: jfpath = self._get_corresponding_jsondata_fpath(src) obj = json.load(jfpath.open()) if jfpath.exists() else {} if obj and not src.exists(): # early exit, there is no tabular data return self._postproc_obj( obj, src=src, trace=trace, ) with src.open(newline='') as tsvfile: reader = csv.reader(tsvfile, delimiter='\t') # row_id is useful for error reporting for row_id, row in enumerate(reader): # row is a list of field, with only as many items # as this particular row has columns if not len(row) or not row[0] or row[0].startswith('#'): # skip empty rows, rows with no key, or rows with # a comment key continue key = row[0] val = row[1:] # cut `val` short and remove trailing empty items val = val[:_get_index_after_last_nonempty(val)] if not val: # skip properties with no value(s) continue # we do not amend values for keys! # another row for an already existing key overwrites # we support "sequence" values via multi-column values # supporting two ways just adds unnecessary complexity obj[key] = val return self._postproc_obj(obj, src=src, trace=trace) def _load_many( self, *, src: Path, trace: List, ) -> List[Dict]: obj_tmpl = {} array = list() jfpath = self._get_corresponding_jsondata_fpath(src) if jfpath.exists(): jdata = json.load(jfpath.open()) if isinstance(jdata, dict): obj_tmpl = jdata elif isinstance(jdata, list): array.extend( self._postproc_obj(obj, src=src, trace=trace) for obj in jdata ) if array and not src.exists(): # early exit, there is no tabular data return array # the table field/column names have purposefully _nothing_ # to do with any possibly loaded JSON data fieldnames = None with src.open(newline='') as tsvfile: # we cannot use DictReader -- we need to support identically named # columns reader = csv.reader(tsvfile, delimiter='\t') # row_id is useful for error reporting for row_id, row in enumerate(reader): # row is a list of field, with only as many items # as this particular row has columns if not len(row) \ or row[0].startswith('#') \ or all(v is None for v in row): # skip empty rows, rows with no key, or rows with # a comment key continue if fieldnames is None: # the first non-ignored row defines the property names/keys # cut `val` short and remove trailing empty items fieldnames = row[:_get_index_after_last_nonempty(row)] continue obj = obj_tmpl.copy() obj.update(_manyrow2obj(row, fieldnames)) obj = self._postproc_obj(obj, src=src, trace=trace) # simplify single-item lists to a plain value array.append(obj) return array def _resolve_value( self, v: str, src_fpath: Path, trace: List, ): if not self._recursive: return v if not isinstance(v, str): return v if v.startswith('@tabby-single-'): loader = self._load_single sheet = v[14:] elif v.startswith('@tabby-optional-single-'): loader = self._load_single sheet = v[23:] elif v.startswith('@tabby-many-'): loader = self._load_many sheet = v[12:] elif v.startswith('@tabby-optional-many-'): loader = self._load_many sheet = v[21:] else: # strange, but not enough reason to fail return v src = self._get_corresponding_sheet_fpath(src_fpath, sheet) trace = _build_import_trace(src, trace) try: loaded = loader(src=src, trace=trace) except FileNotFoundError: if v.startswith('@tabby-optional-'): return {} else: raise return loaded def _postproc_obj( self, obj: Dict, src: Path, trace: List, ): # look for @tabby-... imports in values, and act on them obj = { key: [ self._resolve_value(v, src, trace=trace) for v in (val if isinstance(val, list) else [val]) ] for key, val in obj.items() } # apply any overrides obj.update(self._build_overrides(src, obj, self._cpaths)) obj = _compact_obj(obj) if not self._jsonld: # early exit return obj # with jsonld==True, looks for a context ctx = self._get_corresponding_context(src) if ctx: _assign_context(obj, ctx) return obj def _get_corresponding_jsondata_fpath(self, fpath: Path) -> Path: return self._cvnfb(fpath.parent / f'{fpath.stem}.json') def _get_record_context_fpath(self, fpath: Path) -> Path: prefix = _get_tabby_prefix_from_sheet_fpath(fpath) if prefix: return self._cvnfb(fpath.parent / f'{prefix}.ctx.jsonld') else: return fpath.parent / 'ctx.jsonld' def _get_corresponding_context_fpath(self, fpath: Path) -> Path: return self._cvnfb(fpath.parent / f'{fpath.stem}.ctx.jsonld') def _get_corresponding_context(self, src: Path): rec_ctx_fpath = self._cvnfb(self._get_record_context_fpath(src)) sheet_ctx_fpath = self._cvnfb( self._get_corresponding_context_fpath(src), ) ctx = {} for ctx_fpath in (rec_ctx_fpath, sheet_ctx_fpath): if ctx_fpath.exists(): custom_ctx = json.load(ctx_fpath.open()) # TODO report when redefinitions occur ctx.update(custom_ctx) return ctx def _get_corresponding_override_fpath(self, fpath: Path) -> Path: return self._cvnfb(fpath.parent / f'{fpath.stem}.override.json') def _build_overrides(self, src: Path, obj: Dict, cpaths): # sanitize key names in object sanitized_obj = { _sanitize_override_key(k): v for k, v in obj.items() } overrides = {} ofpath = self._cvnfb(self._get_corresponding_override_fpath(src)) if not ofpath.exists(): # we have no overrides return overrides orspec = json.load(ofpath.open()) for k in orspec: spec = orspec[k] ov = [] for s in (spec if isinstance(spec, list) else [spec]): # interpolate str spec, anything else can pass # through as-is if not isinstance(s, str): ov.append(s) continue try: o = s.format(**sanitized_obj) except KeyError: # we do not have what this override spec need, skip it # TODO log this continue ov.append(o) overrides[k] = ov return overrides # TODO rename `sheet` to `tsvsheet` to clarify def _get_corresponding_sheet_fpath( self, fpath: Path, sheet_name: str, ) -> Path: prefix = _get_tabby_prefix_from_sheet_fpath(fpath) if prefix: ret = fpath.parent / f'{prefix}_{sheet_name}.tsv' else: ret = fpath.parent / f'{sheet_name}.tsv' return self._cvnfb(ret) def _cvnfb(self, fpath: Path) -> Path: """Get convention-based fallback file path, if needed""" if fpath.exists(): # this file exists, no need to search for alternatives return fpath prefix = _get_tabby_prefix_from_sheet_fpath(fpath) # strip any prefix and extensions from file name sheet = fpath.name[len(prefix) + 1:] if prefix else fpath.name sheet = sheet.split('.', maxsplit=1)[0] # determine class declaration, if there is any sheet_comp = sheet.split('@', maxsplit=1) if len(sheet_comp) == 1: # no class declared, return input return fpath sname, scls = sheet_comp for cp in self._cpaths: cand = cp / scls / \ f"{prefix}" \ f"{'_' if prefix else ''}" \ f"{sname}{fpath.name[len(sheet):]}" if cand.exists(): # stop at the first existing alternative return cand # there was no alternative, go with original return fpath