Don't `.aclose` `trio` processes until the very end
Trio will kill subprocesses via `Process.__aexit__()` using a `finally:` block (which, yes, will get triggered on cancellation) so we avoid that until true process "tear down" since subactors do many things during graceful shutdown (such as de-registering from the name discovery system). Oddly this only seems to be an issue during cancellation of infinite stream consumption. Resolves #141ensure_deregister
parent
ae9016c06a
commit
4f92cfe74f
|
@ -157,7 +157,6 @@ async def cancel_on_completion(
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def spawn_subactor(
|
async def spawn_subactor(
|
||||||
subactor: 'Actor',
|
subactor: 'Actor',
|
||||||
accept_addr: Tuple[str, int],
|
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
):
|
):
|
||||||
|
|
||||||
|
@ -167,8 +166,11 @@ async def spawn_subactor(
|
||||||
# Hardcode this (instead of using ``_child.__name__`` to avoid a
|
# Hardcode this (instead of using ``_child.__name__`` to avoid a
|
||||||
# double import warning: https://stackoverflow.com/a/45070583
|
# double import warning: https://stackoverflow.com/a/45070583
|
||||||
"tractor._child",
|
"tractor._child",
|
||||||
|
# This is merely an identifier for debugging purposes when
|
||||||
|
# viewing the process tree from the OS
|
||||||
"--uid",
|
"--uid",
|
||||||
str(subactor.uid),
|
str(subactor.uid),
|
||||||
|
# Address the child must connect to on startup
|
||||||
"--parent_addr",
|
"--parent_addr",
|
||||||
str(parent_addr)
|
str(parent_addr)
|
||||||
]
|
]
|
||||||
|
@ -179,8 +181,14 @@ async def spawn_subactor(
|
||||||
subactor.loglevel
|
subactor.loglevel
|
||||||
]
|
]
|
||||||
|
|
||||||
async with await trio.open_process(spawn_cmd) as proc:
|
proc = await trio.open_process(spawn_cmd)
|
||||||
yield proc
|
yield proc
|
||||||
|
|
||||||
|
# XXX: do this **after** cancellation/tearfown
|
||||||
|
# to avoid killing the process too early
|
||||||
|
# since trio does this internally on ``__aexit__()``
|
||||||
|
async with proc:
|
||||||
|
log.debug(f"Terminating {proc}")
|
||||||
|
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
|
@ -206,7 +214,6 @@ async def new_proc(
|
||||||
if use_trio_run_in_process or _spawn_method == 'trio':
|
if use_trio_run_in_process or _spawn_method == 'trio':
|
||||||
async with spawn_subactor(
|
async with spawn_subactor(
|
||||||
subactor,
|
subactor,
|
||||||
bind_addr,
|
|
||||||
parent_addr,
|
parent_addr,
|
||||||
) as proc:
|
) as proc:
|
||||||
log.info(f"Started {proc}")
|
log.info(f"Started {proc}")
|
||||||
|
|
Loading…
Reference in New Issue