forked from goodboy/tractor
1
0
Fork 0

Use `trio.Process.__aexit__()` and pass the actor uid

Using the context manager interface does some extra teardown beyond simply
calling `.wait()`. Pass the subactor's "uid" on the exec line for
debugging purposes when monitoring the process tree from the OS.
Hard code the child script module path to avoid a double import warning.
drop_cloudpickle
Tyler Goodlet 2020-07-22 01:43:15 -04:00
parent a215df8dfc
commit aa620fe61d
1 changed files with 15 additions and 12 deletions

View File

@ -157,24 +157,26 @@ async def cancel_on_completion(
@asynccontextmanager @asynccontextmanager
async def run_in_process(async_fn, *args, **kwargs): async def run_in_process(subactor, async_fn, *args, **kwargs):
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs)) encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
p = await trio.open_process(
async with await trio.open_process(
[ [
sys.executable, sys.executable,
"-m", "-m",
_child.__name__ # Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# This is merely an identifier for debugging purposes when
# viewing the process tree from the OS
str(subactor.uid),
], ],
stdin=subprocess.PIPE stdin=subprocess.PIPE,
) ) as proc:
# send over func to call # send func object to call in child
await p.stdin.send_all(encoded_job) await proc.stdin.send_all(encoded_job)
yield proc
yield p
# wait for termination
await p.wait()
async def new_proc( async def new_proc(
@ -200,6 +202,7 @@ async def new_proc(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio': if use_trio_run_in_process or _spawn_method == 'trio':
async with run_in_process( async with run_in_process(
subactor,
_trio_main, _trio_main,
subactor, subactor,
bind_addr, bind_addr,