datasalad.iterable_subprocess.iterable_subprocess
- datasalad.iterable_subprocess.iterable_subprocess(program: list[str], input_chunks: Iterable[bytes], chunk_size: int = 65536, cwd: PathLike | str | None = None, bufsize: int = -1, *, env: Mapping[str, str] | None = None, swap_stderr: bool = False)[source]
Subprocess execution context manager with iterable IO
The argument
programset the executable and arguments of the subprocess to run.input_chunksis any iterable to feed to the subprocess’s standard input. It could be an iterable with no items, when no standard input is required.chunk_sizedetermines the minimum output size to be procduced by the subprocess before an output chunk is yielded (set this low enough when, for example, reading progress information).cwdis the path to serve as the working directory of the subprocess.bufsizeis passed toPopen()and affects the creation of the stdin/stdout/stderr pipe file objects used to communicate with the subprocess.envis passed toPopen()as a complete alternative environment for the subprocess, instead of the parent process’ one.swap_stderrcan be set toTruewhen the relevant output to be yielded is coming via standard error. IfTrue, the roles ofstdoutandstderrare swapped with respect to reporting.This context starts a thread that populates the subprocess’s standard input. It also starts a threads that reads the process’s standard error. Otherwise we risk a deadlock - there is no output because the process is waiting for more input.
This itself introduces its own complications and risks, but hopefully mitigated by having a well defined start and stop mechanism that also avoid sending data to the process if it’s not running
To start, i.e. on entry to the context from client code
The process is started
The thread to read from standard error is started
The thread to populate input is started
When running:
The standard input thread iterates over the input, passing chunks to the process
While the standard error thread fetches the error output
And while this thread iterates over the processe’s output from client code in the context
To stop, i.e. on exit of the context from client code
This thread closes the process’s standard output
Wait for the standard input thread to exit
Wait for the standard error thread to exit
Wait for the process to exit
By using context managers internally, this also gives quite strong guarantees that the above order is enforced to make sure the thread doesn’t send data to the process whose standard input is closed and so we don’t get
BrokenPipeerrorsWriting to the process can result in a
BrokenPipeError. If this then results in a non-zero code from the process, the process’s standard error probably has useful information on the cause of this. However, the non-zero error code happens afterBrokenPipeError, so propagating “what happens first” isn’t helpful in this case. So, we re-raiseBrokenPipeErroras_BrokenPipeErrorso we can catch it after the process ends to then allow us to branch on its error code:if it’s non-zero raise a
CommandErrorcontaining its standard errorif it’s zero, re-raise the original
BrokenPipeError
>>> # regular execution, no input iterable >>> with iterable_subprocess(['printf', 'test'], []) as proc: ... for chunk in proc: ... print(chunk) b'test' >>> # feed subprocess stdin from an iterable >>> with iterable_subprocess(['cat'], [b'test']) as proc: ... for chunk in proc: ... print(chunk) b'test'