"""Report on the content of a Git-annex repository worktree
The main functionality is provided by the :func:`iter_annexworktree()`
function.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from more_itertools import intersperse
from pathlib import (
Path,
PurePath,
)
from typing import (
Dict,
Type,
Union,
Any,
Generator,
)
from datasalad.itertools import (
itemize,
load_json,
route_in,
route_out,
StoreOnly,
)
from datalad_next.consts import on_windows
from datalad_next.repo_utils import has_initialized_annex
from datalad_next.runners import iter_git_subproc
from .gitworktree import (
GitWorktreeItem,
GitWorktreeFileSystemItem,
iter_gitworktree,
)
from .utils import FileSystemItemType
lgr = logging.getLogger('datalad.ext.next.iter_collections.annexworktree')
@dataclass
class AnnexWorktreeItem(GitWorktreeItem):
annexkey: str | None = None
annexsize: int | None = None
# annex object path, relative to the item
annexobjpath: PurePath | None = None
@classmethod
def from_gitworktreeitem(
cls,
item: GitWorktreeItem,
) -> "AnnexWorktreeItem":
return cls(**item.__dict__)
@dataclass
class AnnexWorktreeFileSystemItem(GitWorktreeFileSystemItem):
annexkey: str | None = None
annexsize: int | None = None
# annex object path, relative to the item
annexobjpath: PurePath | None = None
# TODO this iterator should get a filter mechanism to limit it to a single
# directory (non-recursive). This will be needed for gooey.
# unlike iter_gitworktree() we pay a larger dedicated per item cost.
# Given that the switch to iterative processing is also made for
# iter_gitworktree() we should provide the same filtering for that one
# too!
[docs]
def iter_annexworktree(
path: Path,
*,
untracked: str | None = 'all',
link_target: bool = False,
fp: bool = False,
recursive: str = 'repository',
) -> Generator[AnnexWorktreeItem | AnnexWorktreeFileSystemItem, None, None]:
"""Companion to ``iter_gitworktree()`` for git-annex repositories
This iterator wraps
:func:`~datalad_next.iter_collections.gitworktree.iter_gitworktree`.
For each item, it determines whether it is an annexed file. If so,
it amends the yielded item with information on the respective
annex key, the byte size of the key, and its (would-be) location
in the repository's annex.
The basic semantics of all arguments are identical to
:func:`~datalad_next.iter_collections.gitworktree.iter_gitworktree`.
Importantly, with ``fp=True``, an annex object is opened directly,
if available. If not available, no attempt is made to open the associated
symlink or pointer file.
With ``link_target`` and ``fp`` disabled items of
type :class:`AnnexWorktreeItem` are yielded, otherwise
:class:`AnnexWorktreeFileSystemItem` instances are yielded. In both cases,
``annexkey``, ``annexsize``, and ``annnexobjpath`` properties are provided.
.. note::
Although ``annexobjpath`` is always set for annexed content, that does
not imply that an object at this path actually exists. The latter will
only be the case if the annexed content is present in the work tree,
typically as a result of a `datalad get`- or `git annex get`-call.
Parameters
----------
path: Path
Path of a directory in a git-annex repository to report on. This
directory need not be the root directory of the repository, but
must be part of the repository's work tree.
untracked: {'all', 'whole-dir', 'no-empty-dir'} or None, optional
If not ``None``, also reports on untracked work tree content.
``all`` reports on any untracked file; ``whole-dir`` yields a single
report for a directory that is entirely untracked, and not individual
untracked files in it; ``no-empty-dir`` skips any reports on
untracked empty directories.
link_target: bool, optional
If ``True``, information matching a
:class:`~datalad_next.iter_collections.utils.FileSystemItem`
will be included for each yielded item, and the targets of
any symlinks will be reported, too.
fp: bool, optional
If ``True``, information matching a
:class:`~datalad_next.iter_collections.utils.FileSystemItem`
will be included for each yielded item, but without a
link target detection, unless ``link_target`` is given.
Moreover, each file-type item includes a file-like object
to access the file's content. This file handle will be closed
automatically when the next item is yielded.
recursive: {'repository', 'no'}, optional
Pass on to
:func:`~datalad_next.iter_collections.gitworktree.iter_gitworktree`,
thereby determining which items this iterator will yield.
Yields
------
:class:`AnnexWorktreeItem` or :class:`AnnexWorktreeFileSystemItem`
The ``name`` attribute of an item is a ``PurePath`` instance with
the corresponding (relative) path, in platform conventions.
"""
glsf = iter_gitworktree(
path,
untracked=untracked,
link_target=False,
fp=False,
recursive=recursive,
)
if not has_initialized_annex(path):
# this is not an annex repo.
# we just yield the items from the gitworktree iterator.
# we funnel them through the standard result item prep
# function for type equality.
# when a recursive-mode other than 'repository' will be
# implemented, this implementation needs to be double-checked
# to avoid decision making on submodules just based on
# the nature of the toplevel repo.
for item in glsf:
yield _get_worktree_item(
path, get_fs_info=link_target, git_item=item)
return
git_fileinfo_store: list[Any] = list()
# this is a technical helper that will just store a bunch of `None`s
# for aligning item-results between git-ls-files and git-annex-find
_annex_git_align: list[Any] = list()
with \
iter_git_subproc(
# we get the annex key for any filename
# (or empty if not annexed)
['annex', 'find', '--anything', '--format=${key}\\n',
'--batch'],
# intersperse items with newlines to trigger a batch run
# this avoids string operations to append newlines to items
input=intersperse(
b'\n',
# use `GitWorktree*`-elements yielded by `iter_gitworktree`
# to create an `AnnexWorktreeItem` or
# `AnnexWorktreeFileSystemItem` object, which is stored in
# `git_fileinfo_store`. Yield a string representation of
# the path contained in the `GitWorktree*`-element yielded
# by `iter_gitworktree`
route_out(
glsf,
git_fileinfo_store,
lambda git_worktree_item: (
str(git_worktree_item.name).encode(),
git_worktree_item
)
)
),
cwd=path,
) as gaf, \
iter_git_subproc(
# get the key properties JSON-lines style
['annex', 'examinekey', '--json', '--batch'],
# use only non-empty keys as input to `git annex examinekey`.
input=intersperse(
# Add line ending to submit the key to batch processing in
# `git annex examinekey`.
b'\n',
route_out(
itemize(
gaf,
# although we declare a specific key output format
# for the git-annex find call, versions of
# git-annex <10.20231129 on Windows will terminate
# lines with '\r\n' instead of '\n'. We therefore use
# `None` as separator, which enables `itemize()`
# to use either separator, i.e. '\r\n' or '\n'.
sep=None if on_windows else b'\n',
),
# we need this route-out solely for the purpose
# of maintaining a 1:1 relationship of items reported
# by git-ls-files and git-annex-find (merged again
# in the `route-in` that gives `results` below). The
# "store" here does not actually store anything other
# than`None`s (because the `key` --which is consumed by
# `git annex examinekey`-- is also present in the
# output of `git annex examinekey`).
_annex_git_align,
# do not process empty key lines. Non-empty key lines
# are processed, but nothing needs to be stored because
# the processing result includes the key itself.
lambda key: (key if key else StoreOnly, None)
)
),
cwd=path,
) as gek:
results = route_in(
# the following `route_in` yields processed keys for annexed
# files and `StoreOnly` for non-annexed files. Its
# cardinality is the same as the cardinality of
# `iter_gitworktree`, i.e. it produces data for each element
# yielded by `iter_gitworktree`.
route_in(
load_json(itemize(gek, sep=None)),
_annex_git_align,
# `processed` data is either `StoreOnly` or detailed
# annex key information. we just return `process_data` as
# result, because `join_annex_info` knows how to incorporate
# it into an `AnnexWorktree*`-object.
lambda processed_data, _: processed_data
),
git_fileinfo_store,
_join_annex_info,
)
# at this point, each item in `results` is a dict with a `git_item`
# key that hold a `GitWorktreeItem` instance, plus additional annex
# related keys added by join_annex_info() for annexed files
if not fp:
# life is simpler here, we do not need to open any files in the
# annex, hence all processing can be based in the information
# collected so far
for res in results:
yield _get_worktree_item(path, get_fs_info=link_target, **res)
return
# if we get here, this is about file pointers...
# for any annexed file we need to open, we need to locate it in
# the annex. we get `annexobjpath` in the results. this is
# relative to `path`. We could not use the `link_target`, because
# we might be in a managed branch without link.
path = Path(path)
for res in results:
try:
item = _get_worktree_item(path, get_fs_info=True, **res)
except FileNotFoundError:
# there is nothing to open, yield non FS item
item = _get_worktree_item(path, get_fs_info=False, **res)
yield item
continue
# determine would file we would open
fp_src = None
if item.annexobjpath is not None:
# this is an annexed file
fp_src = item.annexobjpath
elif item.type == FileSystemItemType.file \
and item.annexkey is None:
# regular file (untracked or tracked)
fp_src = item.name
elif item.type == FileSystemItemType.symlink \
and item.annexkey is None:
# regular symlink
fp_src = item.name
if fp_src is None:
# nothing to open
yield item
else:
fp_src_fullpath = path / fp_src
if not fp_src_fullpath.exists():
# nothing there to open (would resolve through a symlink)
yield item
else:
with fp_src_fullpath.open('rb') as active_fp:
item.fp = active_fp
yield item
def _get_worktree_item(
base_path: Path,
get_fs_info: bool,
git_item: GitWorktreeItem,
annexkey: str | None = None,
annexsize: int | None = None,
annexobjpath: str | None = None,
) -> AnnexWorktreeFileSystemItem | AnnexWorktreeItem:
"""Internal helper to get an item from ``_join_annex_info()`` output
The assumption is that minimal investigations have been done
until this helper is called. In particular, no file system inspects
have been performed.
Depending on whether a user requested file system information to be
contained in the items (``get_fs_info``), either
``AnnexWorktreeFileSystemItem`` or ``AnnexWorktreeItem`` is returned.
The main workhorse of this function if
``AnnexWorktreeFileSystemItem.from_path()``. Besides calling it,
information is only taken from arguments and injected into the item
instances.
"""
# we did not do any filesystem inspection previously, so
# do now when link_target is enabled
item = AnnexWorktreeFileSystemItem.from_path(
base_path / git_item.name,
link_target=True,
) if get_fs_info else AnnexWorktreeItem.from_gitworktreeitem(git_item)
# amend the AnnexWorktree* object with the available git info
item.gitsha = git_item.gitsha
item.gittype = git_item.gittype
# amend the AnnexWorktree* object with the available annex info
item.annexkey = annexkey
item.annexsize = annexsize
item.annexobjpath = annexobjpath
return item
def _join_annex_info(
processed_data: Union[Type[StoreOnly], Dict[str, str]],
stored_data: GitWorktreeItem,
) -> dict:
"""Internal helper to join results from pipeline stages
All that is happening here is that information from git and git-annex
inquiries gets merged into a single result dict.
"""
joined = dict(git_item=stored_data)
if processed_data is StoreOnly:
# this is a non-annexed item, nothing to join
return joined
else:
# here processed data is a dict with properties from annex examinekey
joined.update(
annexkey=processed_data['key'],
annexsize=int(processed_data['bytesize']),
annexobjpath=PurePath(str(processed_data['objectpath'])),
)
return joined