# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Wrapper for command and function calls, allowing for dry runs and output handling
"""
import time
import subprocess
import sys
import logging
import os
import atexit
import functools
import tempfile
from locale import getpreferredencoding
import asyncio
from collections import (
defaultdict,
namedtuple,
OrderedDict,
)
from .consts import GIT_SSH_COMMAND
from .dochelpers import (
borrowdoc,
exc_str,
)
from .support import path as op
from .support.exceptions import CommandError
from .support.protocol import (
NullProtocol,
ExecutionTimeProtocol,
ExecutionTimeExternalsProtocol,
)
from .utils import (
assure_bytes,
assure_unicode,
auto_repr,
ensure_unicode,
generate_file_chunks,
get_tempfile_kwargs,
split_cmdline,
unlink,
)
lgr = logging.getLogger('datalad.cmd')
# In python3 to split byte stream on newline, it must be bytes
linesep_bytes = os.linesep.encode()
_TEMP_std = sys.stdout, sys.stderr
# To be used in the temp file name to distinguish the ones we create
# in Runner so we take care about their removal, in contrast to those
# which might be created outside and passed into Runner
_MAGICAL_OUTPUT_MARKER = "_runneroutput_"
from io import IOBase as file_class
def _decide_to_log(v):
"""Hacky workaround for now so we could specify per each which to
log online and which to the log"""
if isinstance(v, bool) or callable(v):
return v
elif v in {'online'}:
return True
elif v in {'offline'}:
return False
else:
raise ValueError("can be bool, callable, 'online' or 'offline'")
def _get_output_stream(log_std, false_value):
"""Helper to prepare output stream for Popen and use file for 'offline'
Necessary to avoid lockdowns when both stdout and stderr are pipes
"""
if log_std:
if log_std == 'offline':
# we will open a temporary file
tf = tempfile.mktemp(
**get_tempfile_kwargs({}, prefix=_MAGICAL_OUTPUT_MARKER)
)
return open(tf, 'w') # XXX PY3 should be 'b' may be?
else:
return subprocess.PIPE
else:
return false_value
def _cleanup_output(stream, std):
if isinstance(stream, file_class) and \
_MAGICAL_OUTPUT_MARKER in getattr(stream, 'name', ''):
if not stream.closed:
stream.close()
if op.exists(stream.name):
unlink(stream.name)
elif stream == subprocess.PIPE:
std.close()
[docs]def run_gitcommand_on_file_list_chunks(func, cmd, files, *args, **kwargs):
"""Run a git command multiple times if `files` is too long
Parameters
----------
func : callable
Typically a Runner.run variant. Assumed to return a 2-tuple with stdout
and stderr as strings.
cmd : list
Base Git command argument list, to be amended with '--', followed
by a file list chunk.
files : list
List of files.
args, kwargs :
Passed to `func`
Returns
-------
str, str
Concatenated stdout and stderr.
"""
assert isinstance(cmd, list)
if not files:
file_chunks = [[]]
else:
file_chunks = generate_file_chunks(files, cmd)
results = []
for i, file_chunk in enumerate(file_chunks):
if file_chunk:
lgr.debug('Process file list chunk %i (length %i)',
i, len(file_chunk))
results.append(func(cmd + ['--'] + file_chunk, *args, **kwargs))
else:
results.append(func(cmd, *args, **kwargs))
# if it was a WitlessRunner.run -- we would get dicts.
# If old Runner -- stdout, stderr strings
if results:
if isinstance(results[0], dict):
result = defaultdict(str)
for r in results:
for k, v in r.items():
if k in ('stdout', 'stderr'):
result[k] += v
elif k == 'status':
if k:
# this wrapper expects commands to succeed or exception
# being thrown
raise RuntimeError(
"Running %s with WitlessRunner resulted in "
"exit %d but there were no exception" % (cmd, k))
elif v:
raise RuntimeError(
"Have no clue what to do about %s=%r" % (k, v))
return result['stdout'], result['stderr']
else:
return ''.join(r[0] for r in results), \
''.join(r[1] for r in results)
[docs]async def run_async_cmd(loop, cmd, protocol, stdin, protocol_kwargs=None,
**kwargs):
"""Run a command in a subprocess managed by asyncio
This implementation has been inspired by
https://pymotw.com/3/asyncio/subprocesses.html
Parameters
----------
loop : asyncio.AbstractEventLoop
asyncio event loop instance. Must support subprocesses on the
target platform.
cmd : list
Command to be executed, passed to `subprocess_exec`.
protocol : WitlessProtocol
Protocol class to be instantiated for managing communication
with the subprocess.
stdin : file-like or None
Passed to the subprocess as its standard input.
protocol_kwargs : dict, optional
Passed to the Protocol class constructor.
kwargs : Pass to `subprocess_exec`, will typically be parameters
supported by `subprocess.Popen`.
Returns
-------
undefined
The nature of the return value is determined by the given
protocol class.
"""
lgr.debug('Async run %s', cmd)
if protocol_kwargs is None:
protocol_kwargs = {}
cmd_done = asyncio.Future(loop=loop)
factory = functools.partial(protocol, cmd_done, **protocol_kwargs)
proc = loop.subprocess_exec(
factory,
*cmd,
stdin=stdin,
# ask the protocol which streams to capture
stdout=asyncio.subprocess.PIPE if protocol.proc_out else None,
stderr=asyncio.subprocess.PIPE if protocol.proc_err else None,
**kwargs
)
transport = None
result = None
try:
lgr.debug('Launching process %s', cmd)
transport, protocol = await proc
lgr.debug('Waiting for process %i to complete', transport.get_pid())
# The next wait is a workaround that avoids losing the output of
# quickly exiting commands (https://bugs.python.org/issue41594).
await asyncio.ensure_future(transport._wait())
await cmd_done
result = protocol._prepare_result()
finally:
# protect against a crash whe launching the process
if transport:
transport.close()
return result
[docs]class WitlessProtocol(asyncio.SubprocessProtocol):
"""Subprocess communication protocol base class for `run_async_cmd`
This class implements basic subprocess output handling. Derived classes
like `StdOutCapture` should be used for subprocess communication that need
to capture and return output. In particular, the `pipe_data_received()`
method can be overwritten to implement "online" processing of process
output.
This class defines a default return value setup that causes
`run_async_cmd()` to return a 2-tuple with the subprocess's exit code
and a list with bytestrings of all captured output streams.
"""
FD_NAMES = ['stdin', 'stdout', 'stderr']
proc_out = None
proc_err = None
def __init__(self, done_future):
# future promise to be fulfilled when process exits
self.done = done_future
# capture output in bytearrays while the process is running
Streams = namedtuple('Streams', ['out', 'err'])
self.buffer = Streams(
out=bytearray() if self.proc_out else None,
err=bytearray() if self.proc_err else None,
)
self.pid = None
super().__init__()
self.encoding = getpreferredencoding(do_setlocale=False)
self._log_outputs = False
if lgr.isEnabledFor(5):
try:
from . import cfg
self._log_outputs = cfg.getbool('datalad.log', 'outputs', default=False)
except ImportError:
pass
self._log = self._log_summary
else:
self._log = self._log_nolog
def _log_nolog(self, *args):
pass
def _log_summary(self, fd, data):
fd_name = self.FD_NAMES[fd]
lgr.log(5, 'Read %i bytes from %i[%s]%s',
len(data), self.pid, fd_name, ':' if self._log_outputs else '')
if self._log_outputs:
log_data = assure_unicode(data)
# The way we log is to stay consistent with Runner.
# TODO: later we might just log in a single entry, without
# fd_name prefix
lgr.log(5, "%s| %s " % (fd_name, log_data))
[docs] def connection_made(self, transport):
self.transport = transport
self.pid = transport.get_pid()
lgr.debug('Process %i started', self.pid)
[docs] def pipe_data_received(self, fd, data):
self._log(fd, data)
# store received output if stream was to be captured
if self.buffer[fd - 1] is not None:
self.buffer[fd - 1].extend(data)
def _prepare_result(self):
"""Prepares the final result to be returned to the runner
Note for derived classes overwriting this method:
The result must be a dict with keys that do not unintentionally
conflict with the API of CommandError, as the result dict is passed to
this exception class as kwargs on error. The Runner will overwrite
'cmd' and 'cwd' on error, if they are present in the result.
"""
return_code = self.transport.get_returncode()
lgr.debug(
'Process %i exited with return code %i',
self.pid, return_code)
# give captured process output back to the runner as string(s)
results = {
name:
bytes(byt).decode(self.encoding)
if byt else ''
for name, byt in zip(self.FD_NAMES[1:], self.buffer)
}
results['code'] = return_code
return results
[docs] def process_exited(self):
# actually fulfill the future promise and let the execution finish
self.done.set_result(True)
[docs]class NoCapture(WitlessProtocol):
"""WitlessProtocol that captures no subprocess output
As this is identical with the behavior of the WitlessProtocol base class,
this class is merely a more readable convenience alias.
"""
pass
[docs]class StdOutCapture(WitlessProtocol):
"""WitlessProtocol that only captures and returns stdout of a subprocess"""
proc_out = True
[docs]class StdErrCapture(WitlessProtocol):
"""WitlessProtocol that only captures and returns stderr of a subprocess"""
proc_err = True
[docs]class StdOutErrCapture(WitlessProtocol):
"""WitlessProtocol that captures and returns stdout/stderr of a subprocess
"""
proc_out = True
proc_err = True
[docs]class KillOutput(WitlessProtocol):
"""WitlessProtocol that swallows stdout/stderr of a subprocess
"""
proc_out = True
proc_err = True
[docs] def pipe_data_received(self, fd, data):
if lgr.isEnabledFor(5):
lgr.log(
5,
'Discarded %i bytes from %i[%s]',
len(data), self.pid, self.FD_NAMES[fd])
[docs]class WitlessRunner(object):
"""Minimal Runner with support for online command output processing
It aims to be as simple as possible, providing only essential
functionality.
"""
__slots__ = ['cwd', 'env']
def __init__(self, cwd=None, env=None):
"""
Parameters
----------
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited.
"""
self.env = env
# stringify to support Path instances on PY35
self.cwd = str(cwd) if cwd is not None else None
def _get_adjusted_env(self, env=None, cwd=None, copy=True):
"""Return an adjusted copy of an execution environment
Or return an unaltered copy of the environment, if no adjustments
need to be made.
"""
if copy:
env = env.copy() if env else None
if cwd and env is not None:
# if CWD was provided, we must not make it conflict with
# a potential PWD setting
env['PWD'] = cwd
return env
[docs] def run(self, cmd, protocol=None, stdin=None, cwd=None, env=None, **kwargs):
"""Execute a command and communicate with it.
Parameters
----------
cmd : list
Sequence of program arguments. Passing a single string means
that it is simply the name of the program, no complex shell
commands are supported.
protocol : WitlessProtocol, optional
Protocol class handling interaction with the running process
(e.g. output capture). A number of pre-crafted classes are
provided (e.g `KillOutput`, `NoCapture`, `GitProgress`).
stdin : byte stream, optional
File descriptor like, used as stdin for the process. Passed
verbatim to subprocess.Popen().
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise. Overrides
any `cwd` given to the constructor.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited. Overrides
any `env` given to the constructor.
kwargs :
Passed to the Protocol class constructor.
Returns
-------
dict
At minimum there will be keys 'stdout', 'stderr' with
unicode strings of the cumulative standard output and error
of the process as values.
Raises
------
CommandError
On execution failure (non-zero exit code) this exception is
raised which provides the command (cmd), stdout, stderr,
exit code (status), and a message identifying the failed
command, as properties.
FileNotFoundError
When a given executable does not exist.
"""
if protocol is None:
# by default let all subprocess stream pass through
protocol = NoCapture
cwd = cwd or self.cwd
env = self._get_adjusted_env(
env or self.env,
cwd=cwd,
)
# start a new event loop, which we will close again further down
# if this is not done events like this will occur
# BlockingIOError: [Errno 11] Resource temporarily unavailable
# Exception ignored when trying to write to the signal wakeup fd:
# It is unclear to me why it happens when reusing an event looped
# that it stopped from time to time, but starting fresh and doing
# a full termination seems to address the issue
if sys.platform == "win32":
# use special event loop that supports subprocesses on windows
event_loop = asyncio.ProactorEventLoop()
else:
event_loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(event_loop)
try:
# include the subprocess manager in the asyncio event loop
results = event_loop.run_until_complete(
run_async_cmd(
event_loop,
cmd,
protocol,
stdin,
protocol_kwargs=kwargs,
cwd=cwd,
env=env,
)
)
finally:
# be kind to callers and leave asyncio as we found it
asyncio.set_event_loop(None)
# terminate the event loop, cannot be undone, hence we start a fresh
# one each time (see BlockingIOError notes above)
event_loop.close()
# log before any exception is raised
lgr.log(8, "Finished running %r with status %s", cmd, results['code'])
# make it such that we always blow if a protocol did not report
# a return code at all
if results.get('code', True) not in [0, None]:
# the runner has a better idea, doc string warns Protocol
# implementations not to return these
results.pop('cmd', None)
results.pop('cwd', None)
raise CommandError(
# whatever the results were, we carry them forward
cmd=cmd,
cwd=self.cwd,
**results,
)
# denoise, must be zero at this point
results.pop('code', None)
return results
[docs]class Runner(object):
"""Provides a wrapper for calling functions and commands.
An object of this class provides a methods that calls shell commands or
python functions, allowing for protocolling the calls and output handling.
Outputs (stdout and stderr) can be either logged or streamed to system's
stdout/stderr during execution.
This can be enabled or disabled for both of them independently.
Additionally, a protocol object can be a used with the Runner. Such a
protocol has to implement datalad.support.protocol.ProtocolInterface, is
able to record calls and allows for dry runs.
"""
__slots__ = ['commands', 'dry', 'cwd', 'env', 'protocol',
'_log_opts']
def __init__(self, cwd=None, env=None, protocol=None, log_outputs=None):
"""
Parameters
----------
cwd: string, optional
Base current working directory for commands. Could be overridden
per run call via cwd option
env: dict, optional
Custom environment to use for calls. Could be overridden per run
call via env option
protocol: ProtocolInterface
Protocol object to write to.
log_outputs : bool, optional
Switch to instruct whether outputs should be logged or not. If not
set (default), config 'datalad.log.outputs' would be consulted
"""
self.cwd = cwd
self.env = env
if protocol is None:
# TODO: config cmd.protocol = null
protocol_str = os.environ.get('DATALAD_CMD_PROTOCOL', 'null')
protocol = {
'externals-time': ExecutionTimeExternalsProtocol,
'time': ExecutionTimeProtocol,
'null': NullProtocol
}[protocol_str]()
if protocol_str != 'null':
# we need to dump it into a file at the end
# TODO: config cmd.protocol_prefix = protocol
filename = '%s-%s.log' % (
os.environ.get('DATALAD_CMD_PROTOCOL_PREFIX', 'protocol'),
id(self)
)
atexit.register(functools.partial(protocol.write_to_file, filename))
self.protocol = protocol
# Various options for logging
self._log_opts = {}
# we don't know yet whether we need to log every output or not
if log_outputs is not None:
self._log_opts['outputs'] = log_outputs
def __call__(self, cmd, *args, **kwargs):
"""Convenience method
This will call run() or call() depending on the kind of `cmd`.
If `cmd` is a string it is interpreted as the to be executed command.
Otherwise it is expected to be a callable.
Any other argument is passed to the respective method.
Parameters
----------
cmd: str or callable
command string to be executed via shell or callable to be called.
`*args`:
`**kwargs`:
see Runner.run() and Runner.call() for available arguments.
Raises
------
TypeError
if cmd is neither a string nor a callable.
"""
if isinstance(cmd, str) or isinstance(cmd, list):
return self.run(cmd, *args, **kwargs)
elif callable(cmd):
return self.call(cmd, *args, **kwargs)
else:
raise TypeError("Argument 'command' is neither a string, "
"nor a list nor a callable.")
def _opt_env_adapter(v):
"""If value is a string, split by ,"""
if v:
if v.isdigit():
log_env = bool(int(v))
else:
log_env = v.split(',')
return log_env
else:
return False
_LOG_OPTS_ADAPTERS = OrderedDict([
('outputs', None),
('cwd', None),
('env', _opt_env_adapter),
('stdin', None),
])
def _get_log_setting(self, opt, default=False):
try:
return self._log_opts[opt]
except KeyError:
try:
from . import cfg
except ImportError:
return default
adapter = self._LOG_OPTS_ADAPTERS.get(opt, None)
self._log_opts[opt] = \
(cfg.getbool if not adapter else cfg.get_value)(
'datalad.log', opt, default=default)
if adapter:
self._log_opts[opt] = adapter(self._log_opts[opt])
return self._log_opts[opt]
@property
def log_outputs(self):
return self._get_log_setting('outputs')
@property
def log_cwd(self):
return self._get_log_setting('cwd')
@property
def log_stdin(self):
return self._get_log_setting('stdin')
@property
def log_env(self):
return self._get_log_setting('env')
# Two helpers to encapsulate formatting/output
def _log_out(self, line):
if line and self.log_outputs:
self.log("stdout| " + line.rstrip('\n'))
def _log_err(self, line, expected=False):
if line and self.log_outputs:
self.log("stderr| " + line.rstrip('\n'),
level={True: 5,
False: 11}[expected])
def _get_output_online(self, proc,
log_stdout, log_stderr,
outputstream, errstream,
expect_stderr=False, expect_fail=False):
"""
If log_stdout or log_stderr are callables, they will be given a read
line to be processed, and return processed result. So if they need to
'swallow' the line from being logged, should just return None
Parameters
----------
proc
log_stdout: bool or callable or 'online' or 'offline'
log_stderr: : bool or callable or 'online' or 'offline'
If any of those 'offline', we would call proc.communicate at the
end to grab possibly outstanding output from it
expect_stderr
expect_fail
Returns
-------
"""
stdout, stderr = bytes(), bytes()
log_stdout_ = _decide_to_log(log_stdout)
log_stderr_ = _decide_to_log(log_stderr)
log_stdout_is_callable = callable(log_stdout_)
log_stderr_is_callable = callable(log_stderr_)
# arguments to be passed into _process_one_line
stdout_args = (
'stdout',
proc, log_stdout_, log_stdout_is_callable
)
stderr_args = (
'stderr',
proc, log_stderr_, log_stderr_is_callable,
expect_stderr or expect_fail
)
while proc.poll() is None:
# see for a possibly useful approach to processing output
# in another thread http://codereview.stackexchange.com/a/17959
# current problem is that if there is no output on stderr
# it stalls
# Monitor if anything was output and if nothing, sleep a bit
stdout_, stderr_ = None, None
if log_stdout_:
stdout_ = self._process_one_line(*stdout_args)
stdout += stdout_
if log_stderr_:
stderr_ = self._process_one_line(*stderr_args)
stderr += stderr_
if stdout_ is None and stderr_ is None:
# no output was really produced, so sleep a tiny bit
time.sleep(0.001)
# Handle possible remaining output
if log_stdout_ and log_stderr_:
# If Popen was called with more than two pipes, calling
# communicate() after we partially read the stream will return
# empty output.
stdout += self._process_remaining_output(
outputstream, proc.stdout.read(), *stdout_args)
stderr += self._process_remaining_output(
errstream, proc.stderr.read(), *stderr_args)
stdout_, stderr_ = proc.communicate()
# ??? should we condition it on log_stdout in {'offline'} ???
stdout += self._process_remaining_output(outputstream, stdout_, *stdout_args)
stderr += self._process_remaining_output(errstream, stderr_, *stderr_args)
return stdout, stderr
def _process_remaining_output(self, stream, out_, *pargs):
"""Helper to process output which might have been obtained from popen or
should be loaded from file"""
out = bytes()
if isinstance(stream, file_class) and \
_MAGICAL_OUTPUT_MARKER in getattr(stream, 'name', ''):
assert out_ is None, "should have gone into a file"
if not stream.closed:
stream.close()
with open(stream.name, 'rb') as f:
for line in f:
out += self._process_one_line(*pargs, line=line)
else:
if out_:
# resolving a once in a while failing test #2185
if isinstance(out_, str):
out_ = out_.encode('utf-8')
for line in out_.split(linesep_bytes):
out += self._process_one_line(
*pargs, line=line, suf=linesep_bytes)
return out
def _process_one_line(self, out_type, proc, log_, log_is_callable,
expected=False, line=None, suf=None):
if line is None:
lgr.log(3, "Reading line from %s", out_type)
line = {'stdout': proc.stdout, 'stderr': proc.stderr}[out_type].readline()
else:
lgr.log(3, "Processing provided line")
if line and log_is_callable:
# Let it be processed
line = log_(assure_unicode(line))
if line is not None:
# we are working with binary type here
line = assure_bytes(line)
if line:
if out_type == 'stdout':
self._log_out(assure_unicode(line))
elif out_type == 'stderr':
self._log_err(line.decode('utf-8'),
expected)
else: # pragma: no cover
raise RuntimeError("must not get here")
return (line + suf) if suf else line
# it was output already directly but for code to work, return ""
return bytes()
[docs] def run(self, cmd, log_stdout=True, log_stderr=True, log_online=False,
expect_stderr=False, expect_fail=False,
cwd=None, env=None, shell=None, stdin=None):
"""Runs the command `cmd` using shell.
In case of dry-mode `cmd` is just added to `commands` and it is
actually executed otherwise.
Allows for separately logging stdout and stderr or streaming it to
system's stdout or stderr respectively.
Note: Using a string as `cmd` and shell=True allows for piping,
multiple commands, etc., but that implies split_cmdline() is not
used. This is considered to be a security hazard.
So be careful with input.
Parameters
----------
cmd : str, list
String (or list) defining the command call. No shell is used if cmd
is specified as a list
log_stdout: bool, optional
If True, stdout is logged. Goes to sys.stdout otherwise.
log_stderr: bool, optional
If True, stderr is logged. Goes to sys.stderr otherwise.
log_online: bool, optional
Whether to log as output comes in. Setting to True is preferable
for running user-invoked actions to provide timely output
expect_stderr: bool, optional
Normally, having stderr output is a signal of a problem and thus it
gets logged at level 11. But some utilities, e.g. wget, use
stderr for their progress output. Whenever such output is expected,
set it to True and output will be logged at level 5 unless
exit status is non-0 (in non-online mode only, in online -- would
log at 5)
expect_fail: bool, optional
Normally, if command exits with non-0 status, it is considered an
error and logged at level 11 (above DEBUG). But if the call intended
for checking routine, such messages are usually not needed, thus
it will be logged at level 5.
cwd : string, optional
Directory under which run the command (passed to Popen)
env : string, optional
Custom environment to pass
shell: bool, optional
Run command in a shell. If not specified, then it runs in a shell
only if command is specified as a string (not a list)
stdin: file descriptor
input stream to connect to stdin of the process.
Returns
-------
(stdout, stderr) - bytes!
Raises
------
CommandError
if command's exitcode wasn't 0 or None. exitcode is passed to
CommandError's `code`-field. Command's stdout and stderr are stored
in CommandError's `stdout` and `stderr` fields respectively.
"""
outputstream = _get_output_stream(log_stdout, sys.stdout)
errstream = _get_output_stream(log_stderr, sys.stderr)
popen_env = env or self.env
popen_cwd = cwd or self.cwd
if popen_cwd and popen_env and 'PWD' in popen_env:
# we must have inherited PWD, but cwd was provided, so we must
# adjust it
popen_env = popen_env.copy() # to avoid side-effects
popen_env['PWD'] = popen_cwd
# TODO: if outputstream is sys.stdout and that one is set to StringIO
# we have to "shim" it with something providing fileno().
# This happens when we do not swallow outputs, while allowing nosetest's
# StringIO to be provided as stdout, crashing the Popen requiring
# fileno(). In out swallow_outputs, we just use temporary files
# to overcome this problem.
# For now necessary test code should be wrapped into swallow_outputs cm
# to avoid the problem
log_msgs = ["Running: %s"]
log_args = [cmd]
if self.log_cwd:
log_msgs += ['cwd=%r']
log_args += [popen_cwd]
if self.log_stdin:
log_msgs += ['stdin=%r']
log_args += [stdin]
log_env = self.log_env
if log_env and popen_env:
log_msgs += ["env=%r"]
log_args.append(
popen_env if log_env is True
else {k: popen_env[k] for k in log_env if k in popen_env}
)
log_msg = '\n'.join(log_msgs)
self.log(log_msg, *log_args)
if self.protocol.do_execute_ext_commands:
if shell is None:
shell = isinstance(cmd, str)
if self.protocol.records_ext_commands:
prot_exc = None
prot_id = self.protocol.start_section(
split_cmdline(cmd)
if isinstance(cmd, str)
else cmd)
try:
proc = subprocess.Popen(cmd,
stdout=outputstream,
stderr=errstream,
shell=shell,
cwd=popen_cwd,
env=popen_env,
stdin=stdin)
except Exception as e:
prot_exc = e
lgr.log(11, "Failed to start %r%r: %s" %
(cmd, " under %r" % cwd if cwd else '', exc_str(e)))
raise
finally:
if self.protocol.records_ext_commands:
self.protocol.end_section(prot_id, prot_exc)
try:
if log_online:
out = self._get_output_online(proc,
log_stdout, log_stderr,
outputstream, errstream,
expect_stderr=expect_stderr,
expect_fail=expect_fail)
else:
out = proc.communicate()
# Decoding was delayed to this point
def decode_if_not_None(x):
return "" if x is None else bytes.decode(x)
out = tuple(map(decode_if_not_None, out))
status = proc.poll()
# needs to be done after we know status
if not log_online:
self._log_out(out[0])
if status not in [0, None]:
self._log_err(out[1], expected=expect_fail)
else:
# as directed
self._log_err(out[1], expected=expect_stderr)
if status not in [0, None]:
exc = CommandError(
cmd=cmd,
code=status,
stdout=out[0],
stderr=out[1],
cwd=popen_cwd,
)
lgr.log(5 if expect_fail else 11, str(exc))
raise exc
else:
self.log("Finished running %r with status %s" % (cmd, status),
level=8)
except CommandError:
# do not bother with reacting to "regular" CommandError
# exceptions. Somehow if we also terminate here for them
# some processes elsewhere might stall:
# see https://github.com/datalad/datalad/pull/3794
raise
except BaseException as exc:
exc_info = sys.exc_info()
# KeyboardInterrupt is subclass of BaseException
lgr.debug("Terminating process for %s upon exception: %s",
cmd, exc_str(exc))
try:
# there are still possible (although unlikely) cases when
# we fail to interrupt but we
# should not crash if we fail to terminate the process
proc.terminate()
except BaseException as exc2:
lgr.warning("Failed to terminate process for %s: %s",
cmd, exc_str(exc2))
raise exc_info[1]
finally:
# Those streams are for us to close if we asked for a PIPE
# TODO -- assure closing the files
_cleanup_output(outputstream, proc.stdout)
_cleanup_output(errstream, proc.stderr)
else:
if self.protocol.records_ext_commands:
self.protocol.add_section(split_cmdline(cmd)
if isinstance(cmd, str)
else cmd, None)
out = ("DRY", "DRY")
return out
[docs] def call(self, f, *args, **kwargs):
"""Helper to unify collection of logging all "dry" actions.
Calls `f` if `Runner`-object is not in dry-mode. Adds `f` along with
its arguments to `commands` otherwise.
Parameters
----------
f: callable
"""
if self.protocol.do_execute_callables:
if self.protocol.records_callables:
prot_exc = None
prot_id = self.protocol.start_section(
[str(f), "args=%s" % str(args), "kwargs=%s" % str(kwargs)])
try:
return f(*args, **kwargs)
except Exception as e:
prot_exc = e
raise
finally:
if self.protocol.records_callables:
self.protocol.end_section(prot_id, prot_exc)
else:
if self.protocol.records_callables:
self.protocol.add_section(
[str(f), "args=%s" % str(args), "kwargs=%s" % str(kwargs)],
None)
[docs] def log(self, msg, *args, **kwargs):
"""log helper
Logs at level 5 by default and adds "Protocol:"-prefix in order to
log the used protocol.
"""
level = kwargs.pop('level', 5)
if isinstance(self.protocol, NullProtocol):
lgr.log(level, msg, *args, **kwargs)
else:
if args:
msg = msg % args
lgr.log(level, "{%s} %s" % (
self.protocol.__class__.__name__, msg)
)
[docs]class GitRunnerBase(object):
"""
Mix-in class for Runners to be used to run git and git annex commands
Overloads the runner class to check & update GIT_DIR and
GIT_WORK_TREE environment variables set to the absolute path
if is defined and is relative path
"""
_GIT_PATH = None
@staticmethod
def _check_git_path():
"""If using bundled git-annex, we would like to use bundled with it git
Thus we will store _GIT_PATH a path to git in the same directory as annex
if found. If it is empty (but not None), we do nothing
"""
if GitRunnerBase._GIT_PATH is None:
from distutils.spawn import find_executable
# with all the nesting of config and this runner, cannot use our
# cfg here, so will resort to dark magic of environment options
if (os.environ.get('DATALAD_USE_DEFAULT_GIT', '0').lower()
in ('1', 'on', 'true', 'yes')):
git_fpath = find_executable("git")
if git_fpath:
GitRunnerBase._GIT_PATH = ''
lgr.log(9, "Will use default git %s", git_fpath)
return # we are done - there is a default git avail.
# if not -- we will look for a bundled one
GitRunnerBase._GIT_PATH = GitRunnerBase._get_bundled_path()
lgr.log(9, "Will use git under %r (no adjustments to PATH if empty "
"string)", GitRunnerBase._GIT_PATH)
assert(GitRunnerBase._GIT_PATH is not None) # we made the decision!
@staticmethod
def _get_bundled_path():
from distutils.spawn import find_executable
annex_fpath = find_executable("git-annex")
if not annex_fpath:
# not sure how to live further anyways! ;)
alongside = False
else:
annex_path = op.dirname(op.realpath(annex_fpath))
bundled_git_path = op.join(annex_path, 'git')
# we only need to consider bundled git if it's actually different
# from default. (see issue #5030)
alongside = op.lexists(bundled_git_path) and \
bundled_git_path != op.realpath(find_executable('git'))
return annex_path if alongside else ''
[docs] @staticmethod
def get_git_environ_adjusted(env=None):
"""
Replaces GIT_DIR and GIT_WORK_TREE with absolute paths if relative path and defined
"""
# if env set copy else get os environment
git_env = env.copy() if env else os.environ.copy()
if GitRunnerBase._GIT_PATH:
git_env['PATH'] = op.pathsep.join([GitRunnerBase._GIT_PATH, git_env['PATH']]) \
if 'PATH' in git_env \
else GitRunnerBase._GIT_PATH
for varstring in ['GIT_DIR', 'GIT_WORK_TREE']:
var = git_env.get(varstring)
if var: # if env variable set
if not op.isabs(var): # and it's a relative path
git_env[varstring] = op.abspath(var) # to absolute path
lgr.log(9, "Updated %s to %s", varstring, git_env[varstring])
if 'GIT_SSH_COMMAND' not in git_env:
git_env['GIT_SSH_COMMAND'] = GIT_SSH_COMMAND
git_env['GIT_SSH_VARIANT'] = 'ssh'
# We are parsing error messages and hints. For those to work more
# reliably we are doomed to sacrifice i18n effort of git, and enforce
# consistent language of the messages
git_env['LC_MESSAGES'] = 'C'
# But since LC_ALL takes precedence, over LC_MESSAGES, we cannot
# "leak" that one inside, and are doomed to pop it
git_env.pop('LC_ALL', None)
return git_env
[docs]class GitRunner(Runner, GitRunnerBase):
"""A Runner for git and git-annex commands.
See GitRunnerBase it mixes in for more details
"""
@borrowdoc(Runner)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._check_git_path()
[docs] def run(self, cmd, env=None, *args, **kwargs):
out, err = super().run(
cmd,
env=self.get_git_environ_adjusted(env),
*args, **kwargs)
# All communication here will be returned as unicode
# TODO: do that instead within the super's run!
return assure_unicode(out), assure_unicode(err)
[docs]class GitWitlessRunner(WitlessRunner, GitRunnerBase):
"""A WitlessRunner for git and git-annex commands.
See GitRunnerBase it mixes in for more details
"""
@borrowdoc(WitlessRunner)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._check_git_path()
def _get_adjusted_env(self, env=None, cwd=None, copy=True):
env = GitRunnerBase.get_git_environ_adjusted(env=env)
return super()._get_adjusted_env(
env=env,
cwd=cwd,
# git env above is already a copy, so we have no choice,
# but we can prevent duplication
copy=False,
)
[docs]def readline_rstripped(stdout):
#return iter(stdout.readline, b'').next().rstrip()
return stdout.readline().rstrip()
[docs]class SafeDelCloseMixin(object):
"""A helper class to use where __del__ would call .close() which might
fail if "too late in GC game"
"""
def __del__(self):
try:
self.close()
except TypeError:
if os.fdopen is None or lgr.debug is None:
# if we are late in the game and things already gc'ed in py3,
# it is Ok
return
raise
[docs]@auto_repr
class BatchedCommand(SafeDelCloseMixin):
"""Container for a process which would allow for persistent communication
"""
def __init__(self, cmd, path=None, output_proc=None):
if not isinstance(cmd, list):
cmd = [cmd]
self.cmd = cmd
self.path = path
self.output_proc = output_proc if output_proc else readline_rstripped
self._process = None
self._stderr_out = None
self._stderr_out_fname = None
def _initialize(self):
lgr.debug("Initiating a new process for %s" % repr(self))
lgr.log(5, "Command: %s" % self.cmd)
# according to the internet wisdom there is no easy way with subprocess
# while avoid deadlocks etc. We would need to start a thread/subprocess
# to timeout etc
# kwargs = dict(bufsize=1, universal_newlines=True) if PY3 else {}
self._stderr_out, self._stderr_out_fname = tempfile.mkstemp()
self._process = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=self._stderr_out,
env=GitRunnerBase.get_git_environ_adjusted(),
cwd=self.path,
bufsize=1,
universal_newlines=True # **kwargs
)
def _check_process(self, restart=False):
"""Check if the process was terminated and restart if restart
Returns
-------
bool
True if process was alive.
str
stderr if any recorded if was terminated
"""
process = self._process
ret = True
ret_stderr = None
if process and process.poll():
lgr.warning("Process %s was terminated with returncode %s" % (process, process.returncode))
ret_stderr = self.close(return_stderr=True)
ret = False
if self._process is None and restart:
lgr.warning("Restarting the process due to previous failure")
self._initialize()
return ret, ret_stderr
def __call__(self, cmds):
"""
Parameters
----------
cmds : str or tuple or list of (str or tuple)
Returns
-------
str or list
Output received from process. list in case if cmds was a list
"""
input_multiple = isinstance(cmds, list)
if not input_multiple:
cmds = [cmds]
output = [o for o in self.yield_(cmds)]
return output if input_multiple else output[0]
[docs] def yield_(self, cmds):
"""Same as __call__, but requires `cmds` to be an iterable
and yields results for each item."""
for entry in cmds:
if not isinstance(entry, str):
entry = ' '.join(entry)
yield self.proc1(entry)
[docs] def proc1(self, arg):
"""Same as __call__, but only takes a single command argument
and returns a single result.
"""
# TODO: add checks -- may be process died off and needs to be reinitiated
if not self._process:
self._initialize()
entry = arg + '\n'
lgr.log(5, "Sending %r to batched command %s" % (entry, self))
# apparently communicate is just a one time show
# stdout, stderr = self._process.communicate(entry)
# according to the internet wisdom there is no easy way with subprocess
self._check_process(restart=True)
process = self._process # _check_process might have restarted it
process.stdin.write(entry)
process.stdin.flush()
lgr.log(5, "Done sending.")
still_alive, stderr = self._check_process(restart=False)
# TODO: we might want to handle still_alive, e.g. to allow for
# a number of restarts/resends, but it should be per command
# since for some we cannot just resend the same query. But if
# it is just a "get"er - we could resend it few times
# The default output_proc expects a single line output.
# TODO: timeouts etc
stdout = assure_unicode(self.output_proc(process.stdout)) \
if not process.stdout.closed else None
if stderr:
lgr.warning("Received output in stderr: %r", stderr)
lgr.log(5, "Received output: %r" % stdout)
return stdout
[docs] def close(self, return_stderr=False):
"""Close communication and wait for process to terminate
Returns
-------
str
stderr output if return_stderr and stderr file was there.
None otherwise
"""
ret = None
process = self._process
if self._stderr_out:
# close possibly still open fd
lgr.debug(
"Closing stderr of %s", process)
os.fdopen(self._stderr_out).close()
self._stderr_out = None
if process:
lgr.debug(
"Closing stdin of %s and waiting process to finish", process)
process.stdin.close()
process.stdout.close()
process.wait()
self._process = None
lgr.debug("Process %s has finished", process)
# It is hard to debug when something is going wrong. Hopefully logging stderr
# if generally asked might be of help
if lgr.isEnabledFor(5):
from . import cfg
log_stderr = cfg.getbool('datalad.log', 'outputs', default=False)
else:
log_stderr = False
if self._stderr_out_fname and os.path.exists(self._stderr_out_fname):
if return_stderr or log_stderr:
with open(self._stderr_out_fname, 'r') as f:
stderr = f.read()
if return_stderr:
ret = stderr
if log_stderr:
stderr = ensure_unicode(stderr)
stderr = stderr.splitlines()
lgr.log(5, "stderr of %s had %d lines:", process.pid, len(stderr))
for l in stderr:
lgr.log(5, "| " + l)
# remove the file where we kept dumping stderr
unlink(self._stderr_out_fname)
self._stderr_out_fname = None
return ret