Source code for datalad_next.shell.shell

"""
-- autoclass:: ShellCommandExecutor
   :special-members: __call__


"""
from __future__ import annotations

import logging
from contextlib import contextmanager
from dataclasses import dataclass
from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom
from datasalad.runners.iter_subproc import iter_subproc

from queue import Queue
from typing import (
    Generator,
    Iterable,
)

from .response_generators import (
    ShellCommandResponseGenerator,
    VariableLengthResponseGenerator,
    VariableLengthResponseGeneratorPosix,
)

from datalad_next.consts import COPY_BUFSIZE
from datalad_next.exceptions import CommandError


__all__ = [
    'shell',
    'ExecutionResult',
    'ShellCommandExecutor',
]


lgr = logging.getLogger('datalad.ext.next.shell')


@dataclass
class ExecutionResult:
    stdout: bytes
    stderr: bytes
    returncode: int | None

    def to_exception(self,
                     command: bytes | str | list[str],
                     message: str = ''
                     ):
        if self.returncode != 0:
            raise CommandError(
                cmd=command.decode()
                    if isinstance(command, bytes)
                    else str(command),
                msg=message,
                returncode=self.returncode,
                stdout=self.stdout,
                stderr=self.stderr,
            )


@contextmanager
def shell(shell_cmd: list[str],
          *,
          credential: str | None = None,
          chunk_size: int = COPY_BUFSIZE,
          zero_command_rg_class: type[VariableLengthResponseGenerator] = VariableLengthResponseGeneratorPosix,
          ) -> Generator[ShellCommandExecutor, None, None]:
    """Context manager that provides an interactive connection to a shell

    This context manager uses the provided argument ``shell_cmd`` to start a
    shell-subprocess. Usually the commands provided in ``shell_cmd`` will
    start a client for a remote shell, e.g. ``ssh``.

    :func:`shell` returns an instance of :class:`ShellCommandExecutor` in the
    ``as``-variable. This instance can be used to interact with the shell. That
    means, it can be used to execute commands in the shell, receive the data
    that the commands write to their ``stdout`` and ``stderr``, and retrieve
    the return code of the executed commands. All commands that are executed
    via the returned instance of :class:`ShellCommandExecutor` are executed in
    the same shell instance.

    Parameters
    ----------
    shell_cmd : list[str]
        The command to execute the shell. It should be a list of strings that
        is given to :func:`iter_subproc` as `args`-parameter. For example:
        ``['ssh', '-p', '2222', 'localhost']``.
    chunk_size : int, optional
        The size of the chunks that are read from the shell's ``stdout`` and
        ``stderr``. This also defines the size of stored ``stderr``-content.
    zero_command_rg_class : type[VariableLengthResponseGenerator], optional, default: 'VariableLengthResponseGeneratorPosix'
        Shell uses an instance of the specified response generator class to
        execute the *zero command* ("zero command" is the command used to skip
        the login messages of the shell). This class will also be used as the
        default response generator for all further commands executed in the
        :class:`ShellCommandExecutor`-instances that is returned by
        :func:`shell`. Currently, the following concrete subclasses of
        :class:`VariableLengthResponseGenerator` exist:

            - :class:`VariableLengthResponseGeneratorPosix`: compatible with
              POSIX-compliant shells, e.g. ``sh`` or ``bash``.

            - :class:`VariableLengthResponseGeneratorPowerShell`: compatible
              with PowerShell.

    Yields
    ------
    :class:`ShellCommandExecutor`

    Examples
    --------

    **Example 1:** a simple example that invokes a single command, prints its
    output and its return code::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     result = ssh(b'ls -l /etc/passwd')
        ...     print(result.stdout)
        ...     print(result.returncode)
        ...
        b'-rw-r--r-- 1 root root 2773 Nov 14 10:05 /etc/passwd\\n'
        0

    **Example 2:** this example invokes two commands, the second of which exits
    with a non-zero return code. The error output is retrieved from
    ``result.stderr``, which contains all ``stderr`` data that was written
    since the last command was executed::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     print(ssh(b'head -1 /etc/passwd').stdout)
        ...     result = ssh(b'ls /no-such-file')
        ...     print(result.stdout)
        ...     print(result.returncode)
        ...     print(result.stderr)
        ...
        b'root:x:0:0:root:/root:/bin/bash\\n'
        b''
        2
        b"Pseudo-terminal will not be allocated because stdin is not a terminal.\\r\\nls: cannot access '/no-such-file': No such file or directory\\n"

    **Example 3:** demonstrates how to use the
    ``check``-parameter to raise a :class:`CommandError`-exception if the
    return code of the command is
    not zero. This delegates error handling to the calling code and helps to
    keep the code clean::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     print(ssh(b'ls /no-such-file', check=True).stdout)
        ...
        Traceback (most recent call last):
          File "<stdin>", line 2, in <module>
          File "/home/cristian/Develop/datalad-next/datalad_next/shell/shell.py", line 279, in __call__
            return create_result(
          File "/home/cristian/Develop/datalad-next/datalad_next/shell/shell.py", line 349, in create_result
            result.to_exception(command, error_message)
          File "/home/cristian/Develop/datalad-next/datalad_next/shell/shell.py", line 52, in to_exception
            raise CommandError(
        datalad.runner.exception.CommandError: CommandError: 'ls /no-such-file' failed with exitcode 2 [err: 'cannot access '/no-such-file': No such file or directory']

    **Example 4:** an example for manual checking of the return code::

        >>> from datalad_next.shell import shell
        >>> def file_exists(file_name):
        ...     with shell(['ssh', 'localhost']) as ssh:
        ...         result = ssh(f'ls {file_name}')
        ...         return result.returncode == 0
        ... print(file_exists('/etc/passwd'))
        True
        >>> print(file_exists('/no-such-file'))
        False

    **Example 5:** an example for result content checking::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     result = ssh(f'grep root /etc/passwd', check=True).stdout
        ...     if len(result.splitlines()) != 1:
        ...         raise ValueError('Expected exactly one line')

    **Example 6:** how to work with generator-based results.
    For long running commands a generator-based result fetching
    can be used. To use generator-based output the command has to be executed
    with the method
    :meth:`ShellCommandExecutor.start`. This method returns a generator that
    provides command output as soon as it is available::

        >>> import time
        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     result_generator = ssh.start(b'c=0; while [ $c -lt 6 ]; do head -2 /etc/passwd; sleep 2; c=$(( $c + 1 )); done')
        ...     for result in result_generator:
        ...         print(time.time(), result)
        ...     assert result_generator.returncode == 0
        1713358098.82588 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'
        1713358100.8315682 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'
        1713358102.8402972 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'
        1713358104.8490314 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'
        1713358106.8577306 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'
        1713358108.866439 b'root:x:0:0:root:/root:/bin/bash\\nsystemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin\\n'

    (The exact output of the above example might differ, depending on the
    length of the first two entries in the ``/etc/passwd``-file.)

    **Example 7:** how to use the ``stdin``-parameter to feed data to a command
    that is executed in the persistent shell.
    The methods :meth:`ShellCommandExecutor.__call__` and
    :meth:`ShellCommandExecutor.start` allow to pass an iterable in the
    ``stdin``-argument. The content of this iterable will be sent to ``stdin``
    of the executed command::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     result = ssh(b'head -c 4', stdin=(b'ab', b'c', b'd'))
        ...     print(result.stdout)
        b'abcd'

    **Example 8:** how to work with commands that consume ``stdin`` completely.
    In the previous example, the command
    ``head -c 4`` was used to consume data from ``stdin``. This command
    terminates after
    reading exactly 4 bytes from ``stdin``. If ``cat`` was used
    instead of ``head -c 4``, the command would have
    continued to run until its ``stdin`` was closed. The ``stdin`` of the
    command that is executed in the persistent shell can be close by calling
    :meth:`ssh.close`. But, in order to be able to call :meth:`ssh.close`,
    any process that consumes ``stdin`` completely should be executed by
    calling the :meth:`ssh.start`-method.
    The reason for this is that :meth:`ssh.start` will return immediately which
    allows to call the :meth:`ssh.close`-method, as shown in the following
    code (:meth:`ssh.__call__` would have waited for ``cat`` to terminate, but
    because :meth:`ssh.close` is not called, ``cat`` would never terminate)::

        >>> from datalad_next.shell import shell
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     result_generator = ssh.start(b'cat', stdin=(b'12', b'34', b'56'))
        ...     ssh.close()
        ...     print(tuple(result_generator))
        (b'123456',)

    Note that
    the ``ssh``-object cannot be used for further command execution after
    :meth:`ssh.close` was called. Further command execution requires to spin up
    a new persistent shell-object. To prevent this overhead, it is advised to
    limit the number of bytes that a shell-command consumes, either by their
    number, e.g. by using ``head -c``, or by some other means, e.g.
    by interpreting the content or using a command like ``timeout``.

    **Example 9:** upload a file to the persistent shell. The command
    ``head -c`` can be used to implement the upload a file to a remote shell.
    The basic idea
    is to determine the number of bytes that will be uploaded and create a
    command in the remote shell that will consume exactly this amount of bytes.
    The following code implements this idea (without file-name escaping and
    error handling)::

        >>> import os
        >>> import time
        >>> from datalad_next.shell import shell
        >>> def upload(ssh, file_name, remote_file_name):
        ...     size = os.stat(file_name).st_size
        ...     f = open(file_name, 'rb')
        ...     return ssh(f'head -c {size} > {remote_file_name}', stdin=iter(f.read, b''))
        ...
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     upload(ssh, '/etc/passwd', '/tmp/uploaded-1')

    Note: in this example, ``f`` is not explicitly closed, it is only
    closed when the program exits. The reason for
    this is that the shell uses threads internally for stdin-feeding, and there
    is no simple way to determine whether the thread that reads ``f`` has yet
    read an EOF and exited. If ``f`` is closed before the thread exits, and the
    thread tries to read from ``f``, a ``ValueError`` will be raised (the
    function :func:`datalad_next.shell.posix.upload` contains a solution
    for this problem that has slightly more code. For the sake of simplicity,
    this solution was not implemented in the example above).

    **Example 10:** download a file. This example
    uses a fixed-length response generator
    to download a file from a remote shell. The basic idea is to determine the
    number of bytes that will be downloaded and create a fixed-length response
    generator that reads exactly this number of bytes. The fixed length response
    generator is then passed to :meth:`ssh.start` in the keyword-argument
    ``response_generator``. This instructs :meth:`ssh.start` to use the response
    generator to interpret the output of this command invocation (the example
    code has no file-name escaping or error handling)::

        >>> from datalad_next.shell import shell
        >>> from datalad_next.shell.response_generators import FixedLengthResponseGeneratorPosix
        >>> def download(ssh, remote_file_name, local_file_name):
        ...     size = ssh(f'stat -c %s {remote_file_name}').stdout
        ...     with open(local_file_name, 'wb') as f:
        ...         response_generator = FixedLengthResponseGeneratorPosix(ssh.stdout, int(size))
        ...         results = ssh.start(f'cat {remote_file_name}', response_generator=response_generator)
        ...         for chunk in results:
        ...             f.write(chunk)
        ...
        >>> with shell(['ssh', 'localhost']) as ssh:
        ...     download(ssh, '/etc/passwd', '/tmp/downloaded-1')
        ...

    Note that :meth:`ssh.start` is used to start the download. This allows to
    process downloaded data as soon as it is available.

    **Example 11:**
    This example implements interaction with a *Python* interpreter (which
    can be local or remote). Interaction in the context of this example means,
    executing a
    line of python code, returning the result, i.e. the output on ``stdout``,
    and detect whether an exception was raised or not. To this end
    a Python-specific variable-length response generator is created by
    subclassing the
    generic class :class:`VariableLengthResponseGenerator`. The new response
    generator implements the method :meth:`get_final_command`, which takes a
    python statement and returns a ``try``-``except``-block that executes the
    python statement, prints the end-marker and a return code (which is ``0`` if
    the statement was executed successfully, and ``1`` if an exception was
    raised)::

        >>> from datalad_next.shell import shell
        >>> from datalad_next.shell.response_generators import VariableLengthResponseGenerator
        >>> class PythonResponseGenerator(VariableLengthResponseGenerator):
        ...     def get_final_command(self, command: bytes) -> bytes:
        ...         return f'''try:
        ...     {command.decode()}
        ...     print('{self.end_marker.decode()}')
        ...     print(0)
        ... except:
        ...     print('{self.end_marker.decode()}')
        ...     print(1)
        ... '''.encode()
        ...     @property
        ...     def zero_command(self) -> bytes:
        ...         return b'True'
        ...
        >>> with shell(['python', '-u', '-i']) as py:
        ...     print(py('1 + 1'))
        ...     print(py('1 / 0'))
        ...
        ExecutionResult(stdout=b'2\\n', stderr=b'>>> ... ... ... ... ... ... ... ... ', returncode=0)
        ExecutionResult(stdout=b'', stderr=b'... ... ... ... ... ... ... ... Traceback (most recent call last):\\n  File "<stdin>", line 2, in <module>\\nZeroDivisionError: division by zero', returncode=1)

    The python response generator could be extended to deliver exception
    information in an extended ``ExecutionResult``. This can be achieved by
    *pickling* (see the ``pickle``-module) a caught exception to a byte-string,
    printing this byte-string after the return-code line, and printing another
    end-marker. The :meth:`send`-method of the response generator must then
    be overwritten to unpickle the exception information and store it in an
    extended ``ExecutionResult`` (or raise it in the shell-context, if that is
    preferred).

    **Example 12:** this example shows how to use the shell context handler
    in situations were a ``with``-statement is not suitable, e.g. if a shell
    object should be used in multiple, independently called functions. In this
    case the context manager
    can be manually entered and exited. The following code generates a global
    ``ShellCommandExecutor``-instance in the ``ssh``-variable::

        >>> from datalad_next.shell import shell
        >>> context_manager = shell(['ssh', 'localhost'])
        >>> ssh = context_manager.__enter__()
        >>> print(ssh(b'ls /etc/passwd').stdout)
        b'/etc/passwd\\n'
        >>> context_manager.__exit__(None, None, None)
        False

    """

    def train(queue: Queue):
        """Use a queue to allow chaining of iterables at different times"""
        for iterable in iter(queue.get, None):
            yield from iterable

    subprocess_inputs: Queue = Queue()
    with iter_subproc(shell_cmd,
                      inputs=train(subprocess_inputs),
                      chunk_size=chunk_size,
                      bufsize=0) as shell_output:

        assert issubclass(zero_command_rg_class, VariableLengthResponseGenerator)

        cmd_executor = ShellCommandExecutor(
            subprocess_inputs,
            shell_output,
            shell_cmd,
            zero_command_rg_class
        )
        try:
            cmd_executor.command_zero(zero_command_rg_class(shell_output))
            # Return the now ready connection
            yield cmd_executor
        finally:
            # Ensure that the shell is terminated if an exception is raised by
            # code that uses `shell`. This is necessary because
            # the `terminate`-call that is invoked when leaving the
            # `iterable_subprocess`-context will not end the shell-process. It
            # will only terminate if its stdin is closed, or if it is killed.
            subprocess_inputs.put(None)


[docs] class ShellCommandExecutor: """Execute a command in a shell and return a generator that yields output Instances of :class:`ShellCommandExecutor` allow to execute commands that are provided as byte-strings via its :meth:`__call__`-method. To execute the command and collect its output, return code, and stderr-output, :class:`ShellCommandExecutor` uses instances of subclasses of :class:`ShellCommandResponseGenerator`, e.g. :class:`VariableLengthResponseGeneratorPosix`. """ def __init__(self, process_inputs: Queue, stdout: OutputFrom, shell_cmd: list[str], default_rg_class: type[VariableLengthResponseGenerator], ) -> None: self.process_inputs = process_inputs self.stdout = stdout self.shell_cmd = shell_cmd self.default_rg_class = default_rg_class
[docs] def __call__(self, command: bytes | str, *, stdin: Iterable[bytes] | None = None, response_generator: ShellCommandResponseGenerator | None = None, encoding: str = 'utf-8', check: bool = False ) -> ExecutionResult: """Execute a command in the connected shell and return the result This method executes the given command in the connected shell. It assembles all output on stdout, all output on stderr that was written during the execution of the command, and the return code of the command. (The response generator defines when the command output is considered complete. Usually that is done by checking for a random end-of-output marker.) Parameters ---------- command : bytes | str The command to execute. If the command is given as a string, it will be encoded to bytes using the encoding given in `encoding`. stdin : Iterable[byte] | None, optional, default: None If given, the bytes are sent to stdin of the command. Note: If the command reads its ``stdin`` until EOF, you have to use :meth:`self.close` to close ``stdin`` of the command. Otherwise, the command will usually not terminate. Once :meth:`self.close` is called, no more commands can be executed with this :class:`ShellCommandExecutor`-instance. If you want to execute further commands in the same :class:`ShellCommandExecutor`-instance, you must ensure that commands consume a fixed amount of input, for example, by using `head -c <byte-count> | <command>`. response_generator : ShellCommandResponseGenerator | None, optional, default: None If given, the responder generator (usually an instance of a subclass of ``ShellCommandResponseGenerator``), that is used to generate the command line and to parse the output of the command. This can be used to implement, for example, fixed length output processing. encoding : str, optional, default: 'utf-8' The encoding that is used to encode the command if it is given as a string. Note: the encoding should match the decoding the is used in the connected shell. check : bool, optional, default: False If True, a :class:`CommandError`-exception is raised if the return code of the command is not zero. Returns ------- :class:`ExecutionResult` An instance of :class:`ExecutionResult` that contains the ``stdout``-output, the ``stderr``-output, and the return code of the command. Raises ------ :class:`CommandError` If the return code of the command is not zero and `check` is True. """ response_generator = self.start( command, stdin=stdin, response_generator=response_generator, encoding=encoding, ) stdout = b''.join(response_generator) stderr = b''.join(self.stdout.stderr_deque) self.stdout.stderr_deque.clear() return create_result( response_generator, command, stdout, stderr, check=check )
[docs] def start(self, command: bytes | str, *, stdin: Iterable[bytes] | None = None, response_generator: ShellCommandResponseGenerator | None = None, encoding: str = 'utf-8', ) -> ShellCommandResponseGenerator: """Execute a command in the connected shell Execute a command in the connected shell and return a generator that provides the content written to stdout of the command. After the generator is exhausted, the return code of the command is available in the ``returncode``-attribute of the generator. Parameters ---------- command : bytes | str The command to execute. If the command is given as a string, it will be encoded to bytes using the encoding given in `encoding`. stdin : Iterable[byte] | None, optional, default: None If given, the bytes are sent to stdin of the command. Note: If the command reads its ``stdin`` until EOF, you have to use :meth:`self.close` to close ``stdin`` of the command. Otherwise, the command will usually not terminate. Once :meth:`self.close` is called, no more commands can be executed with this :class:`ShellCommandExecutor`-instance. If you want to execute further commands in the same :class:`ShellCommandExecutor`-instance, you must ensure that commands consume a fixed amount of input, for example, by using `head -c <byte-count> | <command>`. response_generator : ShellCommandResponseGenerator | None, optional, default: None If given, the responder generator (usually an instance of a subclass of ``ShellCommandResponseGenerator``), that is used to generate the command line and to parse the output of the command. This can be used to implement, for example, fixed length output processing. encoding : str, optional, default: 'utf-8' The encoding that is used to encode the command if it is given as a string. Note: the encoding should match the decoding the is used in the connected shell. Returns ------- :class:`ShellCommandResponseGenerator` A generator that yields the output of ``stdout`` of the command. The generator is exhausted when all output is read. After that, the return code of the command execution is available in the ``returncode``-attribute of the generator, and the stderr-output is available in the ``stderr_deque``-attribute of the response generator. If a response generator was passed in via the ``response_generator``-parameter, the same instance will be returned. """ if response_generator is None: response_generator = self.default_rg_class(self.stdout) if isinstance(command, str): command = command.encode(encoding) final_command = response_generator.get_final_command(command) # Store the command list to report it in `CommandError`-exceptions. # This is done here to relieve the response generator classes from # this task. self.process_inputs.put([final_command]) if stdin is not None: self.process_inputs.put(stdin) return response_generator
def __repr__(self): return f'{self.__class__.__name__}({self.shell_cmd!r})'
[docs] def close(self): """stop input to the shell This method closes stdin of the shell. This will in turn terminate the shell, no further commands can be executed in the shell. """ self.process_inputs.put(None)
[docs] def command_zero(self, response_generator: VariableLengthResponseGenerator ) -> None: """Execute the zero command This method is only used by :func:`shell` to skip any login messages """ result_zero = self( response_generator.zero_command, response_generator=response_generator, check=True, ) lgr.debug('skipped login message: %s', result_zero.stdout)
def create_result(response_generator: ShellCommandResponseGenerator, command: bytes | str | list[str], stdout: bytes, stderr: bytes, error_message: str = '', check: bool = False) -> ExecutionResult: result = ExecutionResult( stdout=stdout, stderr=stderr, returncode=response_generator.returncode ) if check is True: result.to_exception(command, error_message) return result