# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
"""
from functools import lru_cache
import datalad
from datalad.consts import (
DATASET_CONFIG_FILE,
DATALAD_DOTDIR,
)
from datalad.cmd import GitRunner
from datalad.dochelpers import exc_str
from distutils.version import LooseVersion
import re
import os
from os.path import (
join as opj,
exists,
getmtime,
abspath,
isabs,
)
from time import time
import logging
lgr = logging.getLogger('datalad.config')
cfg_kv_regex = re.compile(r'(^.*)\n(.*)$', flags=re.MULTILINE)
cfg_section_regex = re.compile(r'(.*)\.[^.]+')
cfg_sectionoption_regex = re.compile(r'(.*)\.([^.]+)')
_where_reload_doc = """
where : {'dataset', 'local', 'global', 'override'}, optional
Indicator which configuration file to modify. 'dataset' indicates the
persistent configuration in .datalad/config of a dataset; 'local'
the configuration of a dataset's Git repository in .git/config;
'global' refers to the general configuration that is not specific to
a single repository (usually in $USER/.gitconfig); 'override'
limits the modification to the ConfigManager instance, and the
assigned value overrides any setting from any other source.
reload : bool
Flag whether to reload the configuration from file(s) after
modification. This can be disable to make multiple sequential
modifications slightly more efficient.""".lstrip()
# we cannot import external_versions here, as the cfg comes before anything
# and we would have circular imports
[docs]@lru_cache()
def get_git_version(runner=None):
"""Return version of available git"""
runner = runner or GitRunner()
return runner.run('git version'.split())[0].split()[2]
def _where_reload(obj):
"""Helper decorator to simplify providing repetitive docstring"""
obj.__doc__ = obj.__doc__ % _where_reload_doc
return obj
def _parse_gitconfig_dump(dump, store, fileset, replace, cwd=None):
if replace:
# if we want to replace existing values in the store
# collect into a new dict and `update` the store at the
# end. This way we get the desired behavior of multi-value
# keys, but only for the current source
dct = {}
fileset = set()
else:
# if we don't want to replace value, perform the multi-value
# preserving addition on the existing store right away
dct = store
for line in dump.split('\0'):
if not line:
continue
if line.startswith('file:'):
# origin line
fname = line[5:]
if not isabs(fname):
fname = opj(cwd, fname) if cwd else abspath(fname)
fileset.add(fname)
continue
if line.startswith('command line:'):
# nothing we could handle
continue
kv_match = cfg_kv_regex.match(line)
if kv_match:
k, v = kv_match.groups()
else:
# could be just a key without = value, which git treats as True
# if asked for a bool
k, v = line, None
present_v = dct.get(k, None)
if present_v is None:
dct[k] = v
else:
if isinstance(present_v, tuple):
dct[k] = present_v + (v,)
else:
dct[k] = (present_v, v)
if replace:
store.update(dct)
return store, fileset
def _parse_env(store):
dct = {}
for k in os.environ:
if not k.startswith('DATALAD_'):
continue
dct[k.replace('__', '-').replace('_', '.').lower()] = os.environ[k]
store.update(dct)
return store
[docs]def anything2bool(val):
if hasattr(val, 'lower'):
val = val.lower()
if val in {"off", "no", "false", "0"} or not bool(val):
return False
elif val in {"on", "yes", "true", True} \
or (hasattr(val, 'isdigit') and val.isdigit() and int(val)) \
or isinstance(val, int) and val:
return True
else:
raise TypeError(
"Got value %s which could not be interpreted as a boolean"
% repr(val))
[docs]class ConfigManager(object):
"""Thin wrapper around `git-config` with support for a dataset configuration.
The general idea is to have an object that is primarily used to read/query
configuration option. Upon creation, current configuration is read via one
(or max two, in the case of the presence of dataset-specific configuration)
calls to `git config`. If this class is initialized with a Dataset
instance, it supports reading and writing configuration from
``.datalad/config`` inside a dataset too. This file is committed to Git and
hence useful to ship certain configuration items with a dataset.
The API aims to provide the most significant read-access API of a
dictionary, the Python ConfigParser, and GitPython's config parser
implementations.
This class is presently not capable of efficiently writing multiple
configurations items at once. Instead, each modification results in a
dedicated call to `git config`. This author thinks this is OK, as he
cannot think of a situation where a large number of items need to be
written during normal operation. If such need arises, various solutions are
possible (via GitPython, or an independent writer).
Each instance carries a public `overrides` attribute. This dictionary
contains variables that override any setting read from a file. The overrides
are persistent across reloads, and are not modified by any of the
manipulation methods, such as `set` or `unset`.
Any DATALAD_* environment variable is also presented as a configuration
item. Settings read from environment variables are not stored in any of the
configuration file, but are read dynamically from the environment at each
`reload()` call. Their values take precedence over any specification in
configuration files, and even overrides.
Parameters
----------
dataset : Dataset, optional
If provided, all `git config` calls are executed in this dataset's
directory. Moreover, any modifications are, by default, directed to
this dataset's configuration file (which will be created on demand)
dataset_only : bool
Legacy option, do not use.
overrides : dict, optional
Variable overrides, see general class documentation for details.
source : {'any', 'local', 'dataset', 'dataset-local'}, optional
Which sources of configuration setting to consider. If 'dataset',
configuration items are only read from a dataset's persistent
configuration file, if any is present (the one in ``.datalad/config``, not
``.git/config``); if 'local', any non-committed source is considered
(local and global configuration in Git config's terminology);
if 'dataset-local', persistent dataset configuration and local, but
not global or system configuration are considered; if 'any'
all possible sources of configuration are considered.
"""
_checked_git_identity = False
def __init__(self, dataset=None, dataset_only=False, overrides=None, source='any'):
if source not in ('any', 'local', 'dataset', 'dataset-local'):
raise ValueError(
'Unkown ConfigManager(source=) setting: {}'.format(source))
# legacy compat
if dataset_only:
if source != 'any':
raise ValueError('Refuse to combine legacy dataset_only flag, with '
'source setting')
source = 'dataset'
# store in a simple dict
# no subclassing, because we want to be largely read-only, and implement
# config writing separately
self._store = {}
self._cfgfiles = set()
self._cfgmtimes = None
self._dataset_path = None
self._dataset_cfgfname = None
self._repo_cfgfname = None
self._config_cmd = ['git', 'config']
# public dict to store variables that always override any setting
# read from a file
# `hasattr()` is needed because `datalad.cfg` is generated upon first module
# import, hence when this code runs first, there cannot be any config manager
# to inherit from
self.overrides = datalad.cfg.overrides.copy() if hasattr(datalad, 'cfg') else {}
if overrides is not None:
self.overrides.update(overrides)
if dataset is None:
if source in ('dataset', 'dataset-local'):
raise ValueError(
'ConfigManager configured to read dataset only, '
'but no dataset given')
# The caller didn't specify a repository. Unset the git directory
# when calling 'git config' to prevent a repository in the current
# working directory from leaking configuration into the output.
self._config_cmd = ['git', '--git-dir=', 'config']
else:
self._dataset_path = dataset.path
if source != 'local':
self._dataset_cfgfname = opj(self._dataset_path,
DATASET_CONFIG_FILE)
if source != 'dataset':
self._repo_cfgfname = opj(self._dataset_path, '.git', 'config')
self._src_mode = source
# Since configs could contain sensitive information, to prevent
# any "facilitated" leakage -- just disable logging of outputs for
# this runner
run_kwargs = dict(log_outputs=False)
if dataset is not None:
# make sure we run the git config calls in the dataset
# to pick up the right config files
run_kwargs['cwd'] = dataset.path
self._runner = GitRunner(**run_kwargs)
try:
self._gitconfig_has_showorgin = \
LooseVersion(get_git_version()) >= '2.8.0'
except Exception:
# no git something else broken, assume git is present anyway
# to not delay this, but assume it is old
self._gitconfig_has_showorgin = False
self.reload(force=True)
[docs] def reload(self, force=False):
"""Reload all configuration items from the configured sources
If `force` is False, all files configuration was previously read from
are checked for differences in the modification times. If no difference
is found for any file no reload is performed. This mechanism will not
detect newly created global configuration files, use `force` in this case.
"""
if not force and self._cfgmtimes:
# we aren't forcing and we have read files before
# check if any file we read from has changed
current_time = time()
curmtimes = {c: getmtime(c) for c in self._cfgfiles if exists(c)}
if all(curmtimes[c] == self._cfgmtimes.get(c) and
# protect against low-res mtimes (FAT32 has 2s, EXT3 has 1s!)
# if mtime age is less than worst resolution assume modified
(current_time - curmtimes[c]) > 2.0
for c in curmtimes):
# all the same, nothing to do, except for
# superimpose overrides, could have changed in the meantime
self._store.update(self.overrides)
# reread env, is quick
self._store = _parse_env(self._store)
return
self._store = {}
# 2-step strategy:
# - load datalad dataset config from dataset
# - load git config from all supported by git sources
# in doing so we always stay compatible with where Git gets its
# config from, but also allow to override persistent information
# from dataset locally or globally
run_args = ['-z', '-l']
if self._gitconfig_has_showorgin:
run_args.append('--show-origin')
if self._dataset_cfgfname:
if exists(self._dataset_cfgfname):
stdout, stderr = self._run(
run_args + ['--file', self._dataset_cfgfname],
log_stderr=True
)
# overwrite existing value, do not amend to get multi-line
# values
self._store, self._cfgfiles = _parse_gitconfig_dump(
stdout, self._store, self._cfgfiles, replace=False,
cwd=self._runner.cwd)
if self._src_mode == 'dataset':
# superimpose overrides, and stop early
self._store.update(self.overrides)
return
if self._src_mode == 'dataset-local':
run_args.append('--local')
stdout, stderr = self._run(run_args, log_stderr=True)
self._store, self._cfgfiles = _parse_gitconfig_dump(
stdout, self._store, self._cfgfiles, replace=True,
cwd=self._runner.cwd)
# always monitor the dataset cfg location, we know where it is in all cases
if self._dataset_cfgfname:
self._cfgfiles.add(self._dataset_cfgfname)
self._cfgfiles.add(self._repo_cfgfname)
self._cfgmtimes = {c: getmtime(c) for c in self._cfgfiles if exists(c)}
# superimpose overrides
self._store.update(self.overrides)
# override with environment variables
self._store = _parse_env(self._store)
if not ConfigManager._checked_git_identity:
for cfg, envs in (
('user.name', ('GIT_AUTHOR_NAME', 'GIT_COMMITTER_NAME')),
('user.email', ('GIT_AUTHOR_EMAIL', 'GIT_COMMITTER_EMAIL'))):
if cfg not in self._store \
and not any(e in os.environ for e in envs):
lgr.warning(
"It is highly recommended to configure Git before using "
"DataLad. Set both 'user.name' and 'user.email' "
"configuration variables."
)
ConfigManager._checked_git_identity = True
[docs] @_where_reload
def obtain(self, var, default=None, dialog_type=None, valtype=None,
store=False, where=None, reload=True, **kwargs):
"""
Convenience method to obtain settings interactively, if needed
A UI will be used to ask for user input in interactive sessions.
Questions to ask, and additional explanations can be passed directly
as arguments, or retrieved from a list of pre-configured items.
Additionally, this method allows for type conversion and storage
of obtained settings. Both aspects can also be pre-configured.
Parameters
----------
var : str
Variable name including any section like `git config` expects them,
e.g. 'core.editor'
default : any type
In interactive sessions and if `store` is True, this default value
will be presented to the user for confirmation (or modification).
In all other cases, this value will be silently assigned unless
there is an existing configuration setting.
dialog_type : {'question', 'yesno', None}
Which dialog type to use in interactive sessions. If `None`,
pre-configured UI options are used.
store : bool
Whether to store the obtained value (or default)
%s
`**kwargs`
Additional arguments for the UI function call, such as a question
`text`.
"""
# do local import, as this module is import prominently and the
# could theroetically import all kind of weired things for type
# conversion
from datalad.interface.common_cfg import definitions as cfg_defs
# fetch what we know about this variable
cdef = cfg_defs.get(var, {})
# type conversion setup
if valtype is None and 'type' in cdef:
valtype = cdef['type']
if valtype is None:
valtype = lambda x: x
# any default?
if default is None and 'default' in cdef:
default = cdef['default']
_value = None
if var in self:
# nothing needs to be obtained, it is all here already
_value = self[var]
elif store is False and default is not None:
# nothing will be stored, and we have a default -> no user confirmation
# we cannot use logging, because we want to use the config to confiugre
# the logging
#lgr.debug('using default {} for config setting {}'.format(default, var))
_value = default
if _value is not None:
# we got everything we need and can exit early
try:
return valtype(_value)
except Exception as e:
raise ValueError(
"value '{}' of existing configuration for '{}' cannot be "
"converted to the desired type '{}' ({})".format(
_value, var, valtype, exc_str(e)))
# now we need to try to obtain something from the user
from datalad.ui import ui
# configure UI
dialog_opts = kwargs
if dialog_type is None: # no override
# check for common knowledge on how to obtain a value
if 'ui' in cdef:
dialog_type = cdef['ui'][0]
# pull standard dialog settings
dialog_opts = cdef['ui'][1]
# update with input
dialog_opts.update(kwargs)
if (not ui.is_interactive or dialog_type is None) and default is None:
raise RuntimeError(
"cannot obtain value for configuration item '{}', "
"not preconfigured, no default, no UI available".format(var))
if not hasattr(ui, dialog_type):
raise ValueError("UI '{}' does not support dialog type '{}'".format(
ui, dialog_type))
# configure storage destination, if needed
if store:
if where is None and 'destination' in cdef:
where = cdef['destination']
if where is None:
raise ValueError(
"request to store configuration item '{}', but no "
"storage destination specified".format(var))
# obtain via UI
dialog = getattr(ui, dialog_type)
_value = dialog(default=default, **dialog_opts)
if _value is None:
# we got nothing
if default is None:
raise RuntimeError(
"could not obtain value for configuration item '{}', "
"not preconfigured, no default".format(var))
# XXX maybe we should return default here, even it was returned
# from the UI -- if that is even possible
# execute type conversion before storing to check that we got
# something that looks like what we want
try:
value = valtype(_value)
except Exception as e:
raise ValueError(
"cannot convert user input `{}` to desired type ({})".format(
_value, exc_str(e)))
# XXX we could consider "looping" until we have a value of proper
# type in case of a user typo...
if store:
# store value as it was before any conversion, needs to be str
# anyway
# needs string conversion nevertheless, because default could come
# in as something else
self.add(var, '{}'.format(_value), where=where, reload=reload)
return value
def __str__(self):
return "ConfigManager({}{})".format(
self._cfgfiles,
'+ overrides' if self.overrides else '',
)
#
# Compatibility with dict API
#
def __len__(self):
return len(self._store)
def __getitem__(self, key):
return self._store.__getitem__(key)
def __contains__(self, key):
return self._store.__contains__(key)
[docs] def keys(self):
"""Returns list of configuration item names"""
return self._store.keys()
# XXX should this be *args?
[docs] def get(self, key, default=None):
"""D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None."""
return self._store.get(key, default)
#
# Compatibility with ConfigParser API
#
[docs] def sections(self):
"""Returns a list of the sections available"""
return list(set([cfg_section_regex.match(k).group(1) for k in self._store]))
[docs] def options(self, section):
"""Returns a list of options available in the specified section."""
opts = []
for k in self._store:
sec, opt = cfg_sectionoption_regex.match(k).groups()
if sec == section:
opts.append(opt)
return opts
[docs] def has_section(self, section):
"""Indicates whether a section is present in the configuration"""
for k in self._store:
if k.startswith(section):
return True
return False
[docs] def has_option(self, section, option):
"""If the given section exists, and contains the given option"""
for k in self._store:
sec, opt = cfg_sectionoption_regex.match(k).groups()
if sec == section and opt == option:
return True
return False
[docs] def getint(self, section, option):
"""A convenience method which coerces the option value to an integer"""
return int(self.get_value(section, option))
[docs] def getbool(self, section, option, default=None):
"""A convenience method which coerces the option value to a bool
Values "on", "yes", "true" and any int!=0 are considered True
Values which evaluate to bool False, "off", "no", "false" are considered
False
TypeError is raised for other values.
"""
val = self.get_value(section, option, default=default)
if val is None: # no value at all, git treats it as True
return True
return anything2bool(val)
[docs] def getfloat(self, section, option):
"""A convenience method which coerces the option value to a float"""
return float(self.get_value(section, option))
# this is a hybrid of ConfigParser and dict API
[docs] def items(self, section=None):
"""Return a list of (name, value) pairs for each option
Optionally limited to a given section.
"""
if section is None:
return self._store.items()
return [(k, v) for k, v in self._store.items()
if cfg_section_regex.match(k).group(1) == section]
#
# Compatibility with GitPython's ConfigParser
#
[docs] def get_value(self, section, option, default=None):
"""Like `get()`, but with an optional default value
If the default is not None, the given default value will be returned in
case the option did not exist. This behavior imitates GitPython's
config parser.
"""
try:
return self['.'.join((section, option))]
except KeyError as e:
# this strange dance is needed because gitpython does it this way
if default is not None:
return default
else:
raise e
#
# Modify configuration (proxy respective git-config call)
#
@_where_reload
def _run(self, args, where=None, reload=False, **kwargs):
"""Centralized helper to run "git config" calls
Parameters
----------
args : list
Arguments to pass for git config
%s
**kwargs
Keywords arguments for Runner's call
"""
if where:
args = self._get_location_args(where) + args
out = self._runner.run(self._config_cmd + args, **kwargs)
if reload:
self.reload()
return out
def _get_location_args(self, where, args=None):
if args is None:
args = []
cfg_labels = ('dataset', 'local', 'global', 'override')
if where not in cfg_labels:
raise ValueError(
"unknown configuration label '{}' (not in {})".format(
where, cfg_labels))
if where == 'dataset':
if not self._dataset_cfgfname:
raise ValueError(
'ConfigManager cannot store configuration to dataset, '
'none specified')
# create an empty config file if none exists, `git config` will
# fail otherwise
dscfg_dirname = opj(self._dataset_path, DATALAD_DOTDIR)
if not exists(dscfg_dirname):
os.makedirs(dscfg_dirname)
if not exists(self._dataset_cfgfname):
open(self._dataset_cfgfname, 'w').close()
args.extend(['--file', self._dataset_cfgfname])
elif where == 'global':
args.append('--global')
elif where == 'local':
args.append('--local')
return args
[docs] @_where_reload
def add(self, var, value, where='dataset', reload=True):
"""Add a configuration variable and value
Parameters
----------
var : str
Variable name including any section like `git config` expects them, e.g.
'core.editor'
value : str
Variable value
%s"""
if where == 'override':
from datalad.utils import assure_list
val = assure_list(self.overrides.pop(var, None))
val.append(value)
self.overrides[var] = val[0] if len(val) == 1 else val
if reload:
self.reload(force=True)
return
self._run(['--add', var, value], where=where, reload=reload, log_stderr=True)
[docs] @_where_reload
def set(self, var, value, where='dataset', reload=True, force=False):
"""Set a variable to a value.
In opposition to `add`, this replaces the value of `var` if there is
one already.
Parameters
----------
var : str
Variable name including any section like `git config` expects them, e.g.
'core.editor'
value : str
Variable value
force: bool
if set, replaces all occurrences of `var` by a single one with the
given `value`. Otherwise raise if multiple entries for `var` exist
already
%s"""
if where == 'override':
self.overrides[var] = value
if reload:
self.reload(force=True)
return
from datalad.support.gitrepo import to_options
self._run(to_options(replace_all=force) + [var, value],
where=where, reload=reload, log_stderr=True)
[docs] @_where_reload
def rename_section(self, old, new, where='dataset', reload=True):
"""Rename a configuration section
Parameters
----------
old : str
Name of the section to rename.
new : str
Name of the section to rename to.
%s"""
if where == 'override':
self.overrides = {
(new + k[len(old):]) if k.startswith(old + '.') else k: v
for k, v in self.overrides.items()
}
if reload:
self.reload(force=True)
return
self._run(['--rename-section', old, new], where=where, reload=reload)
[docs] @_where_reload
def remove_section(self, sec, where='dataset', reload=True):
"""Rename a configuration section
Parameters
----------
sec : str
Name of the section to remove.
%s"""
if where == 'override':
self.overrides = {
k: v
for k, v in self.overrides.items()
if not k.startswith(sec + '.')
}
if reload:
self.reload(force=True)
return
self._run(['--remove-section', sec], where=where, reload=reload)
[docs] @_where_reload
def unset(self, var, where='dataset', reload=True):
"""Remove all occurrences of a variable
Parameters
----------
var : str
Name of the variable to remove
%s"""
if where == 'override':
self.overrides.pop(var, None)
if reload:
self.reload(force=True)
return
# use unset all as it is simpler for now
self._run(['--unset-all', var], where=where, reload=reload)
[docs]def rewrite_url(cfg, url):
"""Any matching 'url.<base>.insteadOf' configuration is applied
Any URL that starts with such a configuration will be rewritten
to start, instead, with <base>. When more than one insteadOf
strings match a given URL, the longest match is used.
Parameters
----------
cfg : ConfigManager or dict
dict-like with configuration variable name/value-pairs.
url : str
URL to be rewritten, if matching configuration is found.
Returns
-------
str
Rewritten or unmodified URL.
"""
insteadof = {
# only leave the base url
k[4:-10]: v
for k, v in cfg.items()
if k.startswith('url.') and k.endswith('.insteadof')
}
# all config that applies
matches = {
key: v
for key, val in insteadof.items()
for v in (val if isinstance(val, tuple) else (val,))
if url.startswith(v)
}
# find longest match, like Git does
if matches:
rewrite_base, match = sorted(
matches.items(),
key=lambda x: len(x[1]),
reverse=True,
)[0]
if sum(match == v for v in matches.values()) > 1:
lgr.warning(
"Ignoring URL rewrite configuration for '%s', "
"multiple conflicting definitions exists: %s",
match,
['url.{}.insteadof'.format(k)
for k, v in matches.items()
if v == match]
)
else:
url = '{}{}'.format(rewrite_base, url[len(match):])
return url
# for convenience, bind to class too
ConfigManager.rewrite_url = rewrite_url