"""Report on the status of the worktree
The main functionality is provided by the :func:`iter_gitstatus` function.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import (
Iterator,
Generator,
)
from datalad_next.consts import PRE_INIT_COMMIT_SHA
from datalad_next.runners import (
call_git_lines,
)
from datalad_next.repo_utils import (
get_worktree_head,
)
from .gitdiff import (
GitDiffItem,
GitDiffStatus,
GitContainerModificationType,
iter_gitdiff,
)
from .gitworktree import (
GitTreeItemType,
iter_gitworktree,
iter_submodules,
)
lgr = logging.getLogger('datalad.ext.next.iter_collections.gitstatus')
# map untracked modes given to iter_gitstatus() to those that need to be
# passed to iter_gitworktree() for getting untracked content only
_untracked_mode_map = {
'all': 'only',
'whole-dir': 'only-whole-dir',
'no-empty-dir': 'only-no-empty-dir',
}
[docs]
def iter_gitstatus(
path: Path,
*,
untracked: str | None = 'all',
recursive: str = 'repository',
eval_submodule_state: str = "full",
) -> Generator[GitDiffItem, None, None]:
"""
Recursion mode 'no'
This mode limits the reporting to immediate directory items of a given
path. This mode is not necessarily faster than a 'repository' recursion.
Its primary purpose is the ability to deliver a collapsed report in that
subdirectories are treated similar to submodules -- as containers that
maybe have modified or untracked content.
Parameters
----------
path: Path
Path of a directory in a Git repository to report on. This directory
need not be the root directory of the repository, but must be part of
the repository. If the directory is not the root directory of a
non-bare repository, the iterator is constrained to items underneath
that directory.
untracked: {'all', 'whole-dir', 'no-empty-dir'} or None, optional
If not ``None``, also reports on untracked work tree content.
``all`` reports on any untracked file; ``whole-dir`` yields a single
report for a directory that is entirely untracked, and not individual
untracked files in it; ``no-empty-dir`` skips any reports on
untracked empty directories. Also see ``eval_submodule_state`` for
how this parameter is applied in submodule recursion.
recursive: {'no', 'repository', 'submodules', 'monolithic'}, optional
Behavior for recursion into subtrees. By default (``repository``),
all trees within the repository underneath ``path``) are reported,
but no tree within submodules. With ``submodules``, recursion includes
any submodule that is present. If ``no``, only direct children
are reported on.
eval_submodule_state: {"no", "commit", "full"}, optional
If 'full' (default), the state of a submodule is evaluated by
considering all modifications, with the treatment of untracked files
determined by `untracked`. If 'commit', the modification check is
restricted to comparing the submodule's "HEAD" commit to the one
recorded in the superdataset. If 'no', the state of the subdataset is
not evaluated. When a git-annex repository in adjusted mode is detected,
the reference commit that the worktree is being compared to is the basis
of the adjusted branch (i.e., the corresponding branch).
Yields
------
:class:`GitDiffItem`
The ``name`` and ``prev_name`` attributes of an item are a ``str`` with
the corresponding (relative) path, as reported by Git
(in POSIX conventions).
.. note::
The implementation requires `git rev-parse --path-format=relative`
that was introduced with Git v2.31.
"""
path = Path(path)
head, corresponding_head = get_worktree_head(path)
if head is None:
# no commit at all -> compare to an empty repo.
head = PRE_INIT_COMMIT_SHA
# TODO it would make sense to always (or optionally) compare against any
# existing corresponding_head. This would make the status communicate
# anything that has not made it into the corresponding branch yet
common_args = dict(
head=head,
path=path,
untracked=untracked,
eval_submodule_state=eval_submodule_state,
)
if recursive == 'no':
yield from _yield_dir_items(**common_args)
return
elif recursive == 'repository':
yield from _yield_repo_items(**common_args)
# TODO what we really want is a status that is not against a per-repository
# HEAD, but against the commit that is recorded in the parent repository
# TODO we need a name for that
elif recursive in ('submodules', 'monolithic'):
yield from _yield_hierarchy_items(
recursion_mode=recursive,
**common_args,
)
else:
raise ValueError(f'unknown recursion type {recursive!r}')
#
# status generators for each mode
#
def _yield_dir_items(
*,
head: str | None,
path: Path,
untracked: str | None,
eval_submodule_state: str,
) -> Iterator[GitDiffItem]:
# potential container items in a directory that need content
# investigation
container_types = (
GitTreeItemType.directory,
GitTreeItemType.submodule,
)
if untracked is None:
# no need to look at anything other than the diff report
dir_items = {}
else:
# there is no recursion, avoid wasting cycles on listing individual
# files in subdirectories
untracked = 'whole-dir' if untracked == 'all' else untracked
# gather all directory items upfront, we subtract the ones reported
# modified later and lastly yield all untracked content from them
dir_items = {
item.name.as_posix(): item
for item in iter_gitworktree(
path,
untracked=untracked,
recursive='no',
)
}
# diff constrained to direct children
for item in iter_gitdiff(
path,
from_treeish=head,
# to the worktree
to_treeish=None,
recursive='no',
# TODO trim scope like in repo_items
eval_submodule_state=eval_submodule_state,
):
if item.status != GitDiffStatus.deletion \
and item.gittype in container_types:
if item.gittype == GitTreeItemType.submodule:
# issue standard submodule container report
_eval_submodule(path, item, eval_submodule_state)
else:
dir_path = path / item.path
# this is on a directory. if it appears here, it has
# modified content
if dir_path.exists():
item.add_modification_type(
GitContainerModificationType.modified_content)
if untracked is not None \
and _path_has_untracked(dir_path):
item.add_modification_type(
GitContainerModificationType.untracked_content)
else:
# this directory is gone entirely
item.status = GitDiffStatus.deletion
item.modification_types = None
# we dealt with this item completely
dir_items.pop(item.name, None)
if item.status:
yield item
if untracked is None:
return
# yield anything untracked, and inspect remaining containers
for dir_item in dir_items.values():
if dir_item.gitsha is None and dir_item.gittype is None:
# this is untracked
yield GitDiffItem(
# for homgeneity for report a str-path no matter what
name=str(dir_item.name),
status=GitDiffStatus.other,
)
elif dir_item.gittype in container_types:
# none of these containers has any modification other than
# possibly untracked content
item = GitDiffItem(
# for homgeneity for report a str-path no matter what
name=str(dir_item.name),
# this submodule has not been detected as modified
# per-commit, assign reported gitsha to pre and post
# state
gitsha=dir_item.gitsha,
prev_gitsha=dir_item.gitsha,
gittype=dir_item.gittype,
# TODO others?
)
if item.gittype == GitTreeItemType.submodule:
# issue standard submodule container report
_eval_submodule(path, item, eval_submodule_state)
else:
# this is on a directory. if it appears here, it has
# no modified content
testpath = path / dir_item.path
if _path_has_untracked(path / dir_item.path):
item.status = GitDiffStatus.modification
item.add_modification_type(
GitContainerModificationType.untracked_content)
if item.status:
yield item
def _yield_repo_items(
*,
head: str | None,
path: Path,
untracked: str | None,
eval_submodule_state: str,
) -> Generator[GitDiffItem, None, None]:
"""Report status items for a single/whole repsoitory"""
present_submodules = {
# stringify name for speedy comparison
# TODO double-check that comparisons are primarily with
# GitDiffItem.name which is str
str(item.name): item for item in iter_submodules(path)
}
# start with a repository-contrained diff against the worktree
for item in iter_gitdiff(
path,
from_treeish=head,
# to the worktree
to_treeish=None,
recursive='repository',
# we should be able to go cheaper with the submodule evaluation here.
# We need to redo some check for adjusted mode, and other cases anyways
eval_submodule_state='commit'
if eval_submodule_state == 'full' else eval_submodule_state,
):
# immediately investigate any submodules that are already
# reported modified by Git
if item.gittype == GitTreeItemType.submodule:
_eval_submodule(path, item, eval_submodule_state)
# we dealt with this submodule
present_submodules.pop(item.name, None)
if item.status:
yield item
# we are not generating a recursive report for submodules, hence
# we need to look at ALL submodules for untracked content
# `or {}` for the case where we got no submodules, which happens
# with `eval_submodule_state == 'no'`
for subm_name, subm_item in (present_submodules or {}).items():
# none of these submodules has any modification other than
# possibly untracked content
item = GitDiffItem(
# for homgeneity for report a str-path no matter what
name=str(subm_item.name),
# this submodule has not been detected as modified
# per-commit, assign reported gitsha to pre and post
# state
gitsha=subm_item.gitsha,
prev_gitsha=subm_item.gitsha,
gittype=subm_item.gittype,
# TODO others?
)
# TODO possibly trim eval_submodule_state
_eval_submodule(path, item, eval_submodule_state)
if item.status:
yield item
if untracked is None:
return
# lastly untracked files of this repo
for untracked_item in iter_gitworktree(
path=path,
untracked=_untracked_mode_map[untracked],
link_target=False,
fp=False,
recursive='repository',
):
yield GitDiffItem(
name=untracked_item.name.as_posix(),
status=GitDiffStatus.other,
gittype=untracked_item.gittype,
)
def _yield_hierarchy_items(
*,
head: str | None,
path: Path,
untracked: str | None,
recursion_mode: str,
eval_submodule_state: str,
) -> Generator[GitDiffItem, None, None]:
for item in _yield_repo_items(
head=head,
path=path,
untracked=untracked,
# TODO do we need to adjust the eval mode here for the diff recmodes?
eval_submodule_state=eval_submodule_state,
):
# there is nothing else to do for any non-submodule item
if item.gittype != GitTreeItemType.submodule:
yield item
continue
# we get to see any submodule item passing through here, and can simply
# call this function again for a subpath
# submodule recursion
# the .path of a GitTreeItem is always POSIX
sm_path = path / item.path
if recursion_mode == 'submodules':
# in this mode, we run the submodule status against it own
# worktree head
sm_head, _ = get_worktree_head(sm_path)
# because this need not cover all possible changes with respect
# to the parent repository, we yield an item on the submodule
# itself
yield item
elif recursion_mode == 'monolithic':
# in this mode we determine the change of the submodule with
# respect to the recorded state in the parent. This is either
# the current gitsha, or (if git detected a committed
# modification) the previous sha. This way, any further report
# on changes a comprehensive from the point of view of the parent
# repository, hence no submodule item is emitted
sm_head = item.gitsha or item.prev_gitsha
if item.modification_types is None \
or GitContainerModificationType.new_commits \
in item.modification_types:
# this is a submodule that is either entriely new (added),
# or it has new commits compared to
# its state in the parent dataset. We need to yield this
# item, even if nothing else is modified, because otherwise
# this (unsafed) changed would go unnoticed
# https://github.com/datalad/datalad-next/issues/645
yield item
for i in _yield_hierarchy_items(
head=sm_head,
path=sm_path,
untracked=untracked,
# TODO here we could implement handling for a recursion-depth limit
recursion_mode=recursion_mode,
eval_submodule_state=eval_submodule_state,
):
i.name = f'{item.name}/{i.name}'
yield i
#
# Helpers
#
def _path_has_untracked(path: Path) -> bool:
"""Recursively check for any untracked content (except empty dirs)"""
if not path.exists():
# cannot possibly have untracked
return False
for ut in iter_gitworktree(
path=path,
untracked='only-no-empty-dir',
link_target=False,
fp=False,
recursive='submodules',
):
# fast exit on the first detection
return True
# only after we saw everything we can say there is nothing
return False
def _get_submod_worktree_head(path: Path) -> tuple[bool, str | None, bool]:
"""Returns (submodule exists, SHA | None, adjusted)"""
try:
HEAD, corresponding_head = get_worktree_head(path)
except ValueError:
return False, None, False
adjusted = corresponding_head is not None
if adjusted:
# this is a git-annex adjusted branch. do the comparison against
# its basis. it is not meaningful to track the managed branch in
# a superdataset
HEAD = corresponding_head
res = call_git_lines(
['rev-parse', '--path-format=relative', '--show-toplevel', HEAD],
cwd=path,
)
assert len(res) == 2
if res[0].startswith('..'):
# this is not a report on a submodule at this location
return False, None, adjusted
else:
return True, res[1], adjusted
def _eval_submodule(
basepath: Path,
item: GitDiffItem,
eval_mode: str,
) -> None:
"""In-place amend GitDiffItem submodule item
It does nothing with ``eval_mode='no'``.
"""
if eval_mode == 'no':
return
item_path = basepath / item.path
# this is the cheapest test for the theoretical chance that a submodule
# is present at `item_path`. This is beneficial even when we would only
# run a single call to `git rev-parse`
# https://github.com/datalad/datalad-next/issues/606
if not (item_path / '.git').exists():
return
# get head commit, and whether a submodule is actually present,
# and/or in adjusted mode
subds_present, head_commit, adjusted = _get_submod_worktree_head(item_path)
if not subds_present:
return
if adjusted:
_eval_submodule_adjusted(item_path, item, head_commit, eval_mode)
else:
_eval_submodule_normal(item_path, item, head_commit, eval_mode)
def _eval_submodule_normal(
item_path: Path,
item: GitDiffItem,
head_commit: str | None,
eval_mode: str,
) -> None:
if eval_mode == 'full' and item.status is None or (
item.modification_types
and GitContainerModificationType.new_commits in item.modification_types
):
# if new commits have been detected, the diff-implementation is
# not able to report "modified content" at the same time, if it
# exists. This requires a dedicated inspection, which conincidentally
# is identical to the analysis of an adjusted mode submodule.
return _eval_submodule_adjusted(
item_path, item, head_commit, eval_mode)
if item.gitsha != head_commit:
item.status = GitDiffStatus.modification
item.add_modification_type(GitContainerModificationType.new_commits)
if eval_mode == 'commit':
return
# check for untracked content (recursively)
if _path_has_untracked(item_path):
item.status = GitDiffStatus.modification
item.add_modification_type(
GitContainerModificationType.untracked_content)
def _eval_submodule_adjusted(
item_path: Path,
item: GitDiffItem,
head_commit: str | None,
eval_mode: str,
) -> None:
# we cannot rely on the diff-report for a submodule in adjusted mode.
# git would make the comparison to the adjusted branch HEAD alone.
# this would almost always be invalid, because it is not meaningful to
# track a commit in an adjusted branch (it goes away).
#
# instead, we need to:
# - check for a change in the corresponding HEAD to the recorded commit
# in the parent repository, consider any change "new commits"
# - check for a diff of the worktree to corresponding HEAD, consider
# any such diff a "modified content"
# - and lastly check for untracked content
# start with "no modification"
item.status = None
item.modification_types = None
if item.prev_gitsha != head_commit:
item.status = GitDiffStatus.modification
item.add_modification_type(GitContainerModificationType.new_commits)
if eval_mode == 'commit':
return
if any(
i.status is not None
for i in iter_gitdiff(
item_path,
from_treeish=head_commit,
# worktree
to_treeish=None,
recursive='repository',
find_renames=None,
find_copies=None,
eval_submodule_state='commit',
)
):
item.status = GitDiffStatus.modification
item.add_modification_type(
GitContainerModificationType.modified_content)
# check for untracked content (recursively)
if _path_has_untracked(item_path):
item.status = GitDiffStatus.modification
item.add_modification_type(
GitContainerModificationType.untracked_content)