Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
from abc import abstractmethod | |
from signal import Signals | |
from ._resources import AsyncResource | |
from ._streams import ByteReceiveStream, ByteSendStream | |
class Process(AsyncResource): | |
"""An asynchronous version of :class:`subprocess.Popen`.""" | |
async def wait(self) -> int: | |
""" | |
Wait until the process exits. | |
:return: the exit code of the process | |
""" | |
def terminate(self) -> None: | |
""" | |
Terminates the process, gracefully if possible. | |
On Windows, this calls ``TerminateProcess()``. | |
On POSIX systems, this sends ``SIGTERM`` to the process. | |
.. seealso:: :meth:`subprocess.Popen.terminate` | |
""" | |
def kill(self) -> None: | |
""" | |
Kills the process. | |
On Windows, this calls ``TerminateProcess()``. | |
On POSIX systems, this sends ``SIGKILL`` to the process. | |
.. seealso:: :meth:`subprocess.Popen.kill` | |
""" | |
def send_signal(self, signal: Signals) -> None: | |
""" | |
Send a signal to the subprocess. | |
.. seealso:: :meth:`subprocess.Popen.send_signal` | |
:param signal: the signal number (e.g. :data:`signal.SIGHUP`) | |
""" | |
def pid(self) -> int: | |
"""The process ID of the process.""" | |
def returncode(self) -> int | None: | |
""" | |
The return code of the process. If the process has not yet terminated, this will be | |
``None``. | |
""" | |
def stdin(self) -> ByteSendStream | None: | |
"""The stream for the standard input of the process.""" | |
def stdout(self) -> ByteReceiveStream | None: | |
"""The stream for the standard output of the process.""" | |
def stderr(self) -> ByteReceiveStream | None: | |
"""The stream for the standard error output of the process.""" | |