Source code for datalad_next.shell.response_generators

from __future__ import annotations

import logging
from abc import (
    ABCMeta,
    abstractmethod,
)
from collections import deque
from collections.abc import Generator
from random import randint

from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom
from datasalad.itertools import align_pattern


__all__ = [
    'FixedLengthResponseGenerator',
    'FixedLengthResponseGeneratorPosix',
    'FixedLengthResponseGeneratorPowerShell',
    'ShellCommandResponseGenerator',
    'VariableLengthResponseGenerator',
    'VariableLengthResponseGeneratorPosix',
    'VariableLengthResponseGeneratorPowerShell',
]


lgr = logging.getLogger('datalad.ext.next.shell.protocol')


[docs] class ShellCommandResponseGenerator(Generator, metaclass=ABCMeta): """An abstract class the specifies the minimal functionality of a response generator Subclasses of this class can be used to implement operation-specific, shell-specific or OS-specific details of the command execution and the command output parsing. The return code is available in the ``returncode``-attribute, the stderr-output is available in the ``stderr_deque``-attribute (a ``deque``-instance), of instances of this class. """ def __init__(self, stdout_gen: Generator, stderr_deque: deque) -> None: self.stdout_gen = stdout_gen self.stderr_deque = stderr_deque self.state: str | int = 'output' self.returncode_chunk = b'' self.returncode: int | None = None @staticmethod def _get_number_and_newline(chunk, iterable) -> tuple[int, bytes]: """Help that reads a trailing number and a newline from a chunk Parameters ---------- chunk : bytes An chunk of bytes that should contain the number and the newline. iterable : Iterable An iterable that will be used to extend ``chunk`` if no newline is found in ``chunk``. Returns ------- int A tuple that contains the number that was found in the chunk and the trailing portion of the chunk that was not parsed. """ while b'\n' not in chunk: lgr.log(5, 'completing number chunk') chunk += next(iterable) digits, trailing = chunk.split(b'\n', 1) return int(digits), trailing
[docs] @abstractmethod def send(self, _) -> bytes: """Deliver the next part of generated output Whenever the response generator is iterated over, this method is called and should deliver the next part of the command output or raise ``StopIteration`` if the command has finished. """ raise NotImplementedError
[docs] @abstractmethod def get_final_command(self, command: bytes) -> bytes: """Return a final command list that executes ``command`` This method should return a "final" command-pipeline that executes ``command`` and generates the output structure that the response generator expects. This structure will typically be parsed in the implementation of :meth:`send`. This method is usually only called by :meth:`ShellCommandExecutor.__call__`. """ raise NotImplementedError
[docs] def throw(self, typ, val=..., tb=...): # pragma: no cover return super().throw(typ, val, tb)
[docs] class VariableLengthResponseGenerator(ShellCommandResponseGenerator, metaclass=ABCMeta): """Response generator that handles outputs of unknown length This response generator is used to execute a command that will result in an output of unknown length, e.g. ``ls``. The final command list it creates will execute the command and print a random end-marker and the return code after the output of the command. The :meth:`send`-method of this class uses the end-marker to determine then end of the command output. """ def __init__(self, stdout: OutputFrom, ) -> None: self.end_marker = _create_end_marker() self.stream_marker = self.end_marker + b'\n' self.plain_stdout = stdout super().__init__( align_pattern(stdout, self.stream_marker), stdout.stderr_deque )
[docs] def send(self, _) -> bytes: if self.state == 'output': chunk = next(self.stdout_gen) if self.stream_marker in chunk: self.state = 'returncode' chunk, self.returncode_chunk = chunk.split(self.stream_marker) if chunk: return chunk else: return chunk if self.state == 'returncode': self.returncode, trailing = self._get_number_and_newline( self.returncode_chunk, self.plain_stdout, ) if trailing: lgr.warning( 'unexpected output after return code: %s', repr(trailing)) self.state = 'exhausted' if self.state == 'exhausted': self.state = 'output' raise StopIteration() raise RuntimeError(f'unknown state: {self.state}')
@property @abstractmethod def zero_command(self) -> bytes: """Return a command that functions as "zero command" """ raise NotImplementedError
[docs] class VariableLengthResponseGeneratorPosix(VariableLengthResponseGenerator): """A variable length response generator for POSIX shells""" def __init__(self, stdout): """ Parameters ---------- stdout : OutputFrom A generator that yields output from a shell. Usually the object that is returned by :func:`iter_proc`. """ super().__init__(stdout)
[docs] def get_final_command(self, command: bytes) -> bytes: """Return a command list that executes ``command`` and prints the end-marker The POSIX version for variable length response generators. This method is usually only called by :meth:`ShellCommandExecutor.__call__`. """ return ( command + b' ; x=$?; echo -e -n "' + self.end_marker + b'\\n"; echo $x\n' )
@property def zero_command(self) -> bytes: return b'test 0 -eq 0'
[docs] class VariableLengthResponseGeneratorPowerShell(VariableLengthResponseGenerator): """A variable length response generator for PowerShell shells""" def __init__(self, stdout): """ Parameters ---------- stdout : OutputFrom A generator that yields output from a shell. Usually the object that is returned by :func:`iter_proc`. """ super().__init__(stdout)
[docs] def get_final_command(self, command: bytes) -> bytes: """Return a command list that executes ``command`` and prints the end-marker The PowerShell version for variable length response generators. This method is usually only called by :meth:`ShellCommandExecutor.__call__`. """ # TODO: check whether `command` sets `$LASTEXITCODE` and assign that # to `$x`, iff set. return ( b'$x=0; try {' + command + b'} catch { $x=1 }\n' + b'Write-Host -NoNewline ' + self.end_marker + b'`n$x`n\n' )
@property def zero_command(self) -> bytes: return b'Write-Host hello'
[docs] class FixedLengthResponseGenerator(ShellCommandResponseGenerator, metaclass=ABCMeta): """Response generator for efficient handling of outputs of known length This response generator is used to execute commands that have an output of known length. The final command list it creates will execute the command and print the return code followed by a newline. The :meth:`send`-method of this response generator will read the specified number of bytes and a trailing return code. This is more performant than scanning the output for an end-marker. """ def __init__(self, stdout: OutputFrom, length: int, ) -> None: """ Parameters ---------- stdout : OutputFrom A generator that yields output from a shell. Usually the object that is returned by :func:`iter_proc`. length : int The length (in bytes) of the output that a command will generate. """ super().__init__(stdout, stdout.stderr_deque) self.length = length self.read = 0
[docs] def send(self, _) -> bytes: if self.state == 'output': chunk = next(self.stdout_gen) self.read += len(chunk) if self.read >= self.length: self.state = 'returncode' excess = self.read - self.length if excess > 0: chunk, self.returncode_chunk = chunk[:-excess], chunk[-excess:] else: self.returncode_chunk = b'' if chunk: return chunk else: return chunk if self.state == 'returncode': self.returncode, trailing = self._get_number_and_newline( self.returncode_chunk, self.stdout_gen, ) if trailing: lgr.warning( 'unexpected output after return code: %s', repr(trailing)) self.state = 'exhausted' if self.state == 'exhausted': self.state = 'output' raise StopIteration() raise RuntimeError(f'unknown state: {self.state}')
[docs] class FixedLengthResponseGeneratorPosix(FixedLengthResponseGenerator):
[docs] def get_final_command(self, command: bytes) -> bytes: """Return a final command list for a command with a fixed length output The POSIX version for fixed length response generators. This method is usually only called by :meth:`ShellCommandExecutor.__call__`. """ return command + b' ; echo $?\n'
[docs] class FixedLengthResponseGeneratorPowerShell(FixedLengthResponseGenerator):
[docs] def get_final_command(self, command: bytes) -> bytes: """Return a final command list for a command with a fixed length output The PowerShell version for fixed length response generators. This method is usually only called by :meth:`ShellCommandExecutor.__call__`. """ return ( b'$x=0; try {' + command + b'} catch { $x=1 }\n' + b'Write-Host -NoNewline $x`n\n' )
def _create_end_marker() -> bytes: """ Create a hopefully unique marker for the shell """ # The following line is marked with `nosec` because `randint` is only # used to diversify markers, not for cryptographic purposes. marker_id = f'{randint(1000000000, 9999999999)}'.encode() # nosec fixed_part = b'----datalad-end-marker-' return fixed_part + marker_id + fixed_part[::-1]