Spaces:
Runtime error
Runtime error
File size: 9,242 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
from __future__ import annotations
import os
import pickle
import subprocess
import sys
from collections import deque
from importlib.util import module_from_spec, spec_from_file_location
from typing import Callable, TypeVar, cast
from ._core._eventloop import current_time, get_asynclib, get_cancelled_exc_class
from ._core._exceptions import BrokenWorkerProcess
from ._core._subprocesses import open_process
from ._core._synchronization import CapacityLimiter
from ._core._tasks import CancelScope, fail_after
from .abc import ByteReceiveStream, ByteSendStream, Process
from .lowlevel import RunVar, checkpoint_if_cancelled
from .streams.buffered import BufferedByteReceiveStream
WORKER_MAX_IDLE_TIME = 300 # 5 minutes
T_Retval = TypeVar("T_Retval")
_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
"_process_pool_idle_workers"
)
_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
async def run_sync(
func: Callable[..., T_Retval],
*args: object,
cancellable: bool = False,
limiter: CapacityLimiter | None = None,
) -> T_Retval:
"""
Call the given function with the given arguments in a worker process.
If the ``cancellable`` option is enabled and the task waiting for its completion is cancelled,
the worker process running it will be abruptly terminated using SIGKILL (or
``terminateProcess()`` on Windows).
:param func: a callable
:param args: positional arguments for the callable
:param cancellable: ``True`` to allow cancellation of the operation while it's running
:param limiter: capacity limiter to use to limit the total amount of processes running
(if omitted, the default limiter is used)
:return: an awaitable that yields the return value of the function.
"""
async def send_raw_command(pickled_cmd: bytes) -> object:
try:
await stdin.send(pickled_cmd)
response = await buffered.receive_until(b"\n", 50)
status, length = response.split(b" ")
if status not in (b"RETURN", b"EXCEPTION"):
raise RuntimeError(
f"Worker process returned unexpected response: {response!r}"
)
pickled_response = await buffered.receive_exactly(int(length))
except BaseException as exc:
workers.discard(process)
try:
process.kill()
with CancelScope(shield=True):
await process.aclose()
except ProcessLookupError:
pass
if isinstance(exc, get_cancelled_exc_class()):
raise
else:
raise BrokenWorkerProcess from exc
retval = pickle.loads(pickled_response)
if status == b"EXCEPTION":
assert isinstance(retval, BaseException)
raise retval
else:
return retval
# First pickle the request before trying to reserve a worker process
await checkpoint_if_cancelled()
request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
# If this is the first run in this event loop thread, set up the necessary variables
try:
workers = _process_pool_workers.get()
idle_workers = _process_pool_idle_workers.get()
except LookupError:
workers = set()
idle_workers = deque()
_process_pool_workers.set(workers)
_process_pool_idle_workers.set(idle_workers)
get_asynclib().setup_process_pool_exit_at_shutdown(workers)
async with (limiter or current_default_process_limiter()):
# Pop processes from the pool (starting from the most recently used) until we find one that
# hasn't exited yet
process: Process
while idle_workers:
process, idle_since = idle_workers.pop()
if process.returncode is None:
stdin = cast(ByteSendStream, process.stdin)
buffered = BufferedByteReceiveStream(
cast(ByteReceiveStream, process.stdout)
)
# Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME seconds or
# longer
now = current_time()
killed_processes: list[Process] = []
while idle_workers:
if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
break
process, idle_since = idle_workers.popleft()
process.kill()
workers.remove(process)
killed_processes.append(process)
with CancelScope(shield=True):
for process in killed_processes:
await process.aclose()
break
workers.remove(process)
else:
command = [sys.executable, "-u", "-m", __name__]
process = await open_process(
command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
try:
stdin = cast(ByteSendStream, process.stdin)
buffered = BufferedByteReceiveStream(
cast(ByteReceiveStream, process.stdout)
)
with fail_after(20):
message = await buffered.receive(6)
if message != b"READY\n":
raise BrokenWorkerProcess(
f"Worker process returned unexpected response: {message!r}"
)
main_module_path = getattr(sys.modules["__main__"], "__file__", None)
pickled = pickle.dumps(
("init", sys.path, main_module_path),
protocol=pickle.HIGHEST_PROTOCOL,
)
await send_raw_command(pickled)
except (BrokenWorkerProcess, get_cancelled_exc_class()):
raise
except BaseException as exc:
process.kill()
raise BrokenWorkerProcess(
"Error during worker process initialization"
) from exc
workers.add(process)
with CancelScope(shield=not cancellable):
try:
return cast(T_Retval, await send_raw_command(request))
finally:
if process in workers:
idle_workers.append((process, current_time()))
def current_default_process_limiter() -> CapacityLimiter:
"""
Return the capacity limiter that is used by default to limit the number of worker processes.
:return: a capacity limiter object
"""
try:
return _default_process_limiter.get()
except LookupError:
limiter = CapacityLimiter(os.cpu_count() or 2)
_default_process_limiter.set(limiter)
return limiter
def process_worker() -> None:
# Redirect standard streams to os.devnull so that user code won't interfere with the
# parent-worker communication
stdin = sys.stdin
stdout = sys.stdout
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, "w")
stdout.buffer.write(b"READY\n")
while True:
retval = exception = None
try:
command, *args = pickle.load(stdin.buffer)
except EOFError:
return
except BaseException as exc:
exception = exc
else:
if command == "run":
func, args = args
try:
retval = func(*args)
except BaseException as exc:
exception = exc
elif command == "init":
main_module_path: str | None
sys.path, main_module_path = args
del sys.modules["__main__"]
if main_module_path:
# Load the parent's main module but as __mp_main__ instead of __main__
# (like multiprocessing does) to avoid infinite recursion
try:
spec = spec_from_file_location("__mp_main__", main_module_path)
if spec and spec.loader:
main = module_from_spec(spec)
spec.loader.exec_module(main)
sys.modules["__main__"] = main
except BaseException as exc:
exception = exc
try:
if exception is not None:
status = b"EXCEPTION"
pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
else:
status = b"RETURN"
pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
except BaseException as exc:
exception = exc
status = b"EXCEPTION"
pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
stdout.buffer.write(pickled)
# Respect SIGTERM
if isinstance(exception, SystemExit):
raise exception
if __name__ == "__main__":
process_worker()
|