datasalad.iterable_subprocess.iterable_subprocess

datasalad.iterable_subprocess.iterable_subprocess(program: list[str], input_chunks: Iterable[bytes], chunk_size: int = 65536, cwd: PathLike | str | None = None, bufsize: int = -1, *, env: Mapping[str, str] | None = None, swap_stderr: bool = False)[source]

Subprocess execution context manager with iterable IO

The argument program set the executable and arguments of the subprocess to run. input_chunks is any iterable to feed to the subprocess’s standard input. It could be an iterable with no items, when no standard input is required. chunk_size determines the minimum output size to be procduced by the subprocess before an output chunk is yielded (set this low enough when, for example, reading progress information). cwd is the path to serve as the working directory of the subprocess. bufsize is passed to Popen() and affects the creation of the stdin/stdout/stderr pipe file objects used to communicate with the subprocess. env is passed to Popen() as a complete alternative environment for the subprocess, instead of the parent process’ one. swap_stderr can be set to True when the relevant output to be yielded is coming via standard error. If True, the roles of stdout and stderr are swapped with respect to reporting.

This context starts a thread that populates the subprocess’s standard input. It also starts a threads that reads the process’s standard error. Otherwise we risk a deadlock - there is no output because the process is waiting for more input.

This itself introduces its own complications and risks, but hopefully mitigated by having a well defined start and stop mechanism that also avoid sending data to the process if it’s not running

To start, i.e. on entry to the context from client code

  • The process is started

  • The thread to read from standard error is started

  • The thread to populate input is started

When running:

  • The standard input thread iterates over the input, passing chunks to the process

  • While the standard error thread fetches the error output

  • And while this thread iterates over the processe’s output from client code in the context

To stop, i.e. on exit of the context from client code

  • This thread closes the process’s standard output

  • Wait for the standard input thread to exit

  • Wait for the standard error thread to exit

  • Wait for the process to exit

By using context managers internally, this also gives quite strong guarantees that the above order is enforced to make sure the thread doesn’t send data to the process whose standard input is closed and so we don’t get BrokenPipe errors

Writing to the process can result in a BrokenPipeError. If this then results in a non-zero code from the process, the process’s standard error probably has useful information on the cause of this. However, the non-zero error code happens after BrokenPipeError, so propagating “what happens first” isn’t helpful in this case. So, we re-raise BrokenPipeError as _BrokenPipeError so we can catch it after the process ends to then allow us to branch on its error code:

  • if it’s non-zero raise a CommandError containing its standard error

  • if it’s zero, re-raise the original BrokenPipeError

>>> # regular execution, no input iterable
>>> with iterable_subprocess(['printf', 'test'], []) as proc:
...     for chunk in proc:
...         print(chunk)
b'test'
>>> # feed subprocess stdin from an iterable
>>> with iterable_subprocess(['cat'], [b'test']) as proc:
...     for chunk in proc:
...         print(chunk)
b'test'