forked from goodboy/tractor
1
0
Fork 0

Heh, add back `Actor._root_tn`, it has purpose..

Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.

Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.

Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
  `.trionics.maybe_open_nursery()` calls against optionally passed
  `._[root/service]_tn` are allocated-if-not-provided (the
  `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
  Bp).
- obvi adjust all internal usage to match new naming.

Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
  service-tn and is either,
  + assigned in `._runtime.async_main()` for sub-actors OR,
  + assigned in `._root.open_root_actor()` for the root actor.
  **THE primary reason** to keep this "upper" tn is that during
  a full-`Actor`-cancellation condition (more details below) we want to
  ensure that the IPC connection with a sub-actor's parent is **the last
  thing to be cancelled**; this is most simply implemented by ensuring
  that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
  scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
  body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
  "policy" by scheduling a task in the `._root_tn` which,
  * waits for the `._service_tn` to complete and then,
  * cancels the `._root_tn.cancel_scope`,
  * includes "sclangy" console logging throughout.
Tyler Goodlet 2025-08-19 19:24:20 -04:00
parent ee32bc433c
commit a9f06df3fb
5 changed files with 100 additions and 46 deletions

View File

@ -488,6 +488,7 @@ async def open_root_actor(
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called

View File

@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# `Actor._service_tn`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
@ -462,7 +462,7 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
remotely invoked function, normally in `Actor._service_tn: Nursery`.
'''
__tracebackhide__: bool = hide_tb
@ -936,7 +936,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_n: Nursery`.
`trio.Task`s inside the `Actor._service_tn: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
@ -961,7 +961,7 @@ async def process_messages(
'''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity
assert actor._service_tn # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -1172,7 +1172,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_n.start(
ctx: Context = await actor._service_tn.start(
partial(
_invoke,
actor,
@ -1312,7 +1312,7 @@ async def process_messages(
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
sn: Nursery = actor._service_tn
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'

View File

@ -76,6 +76,7 @@ from tractor.msg import (
)
from .trionics import (
collapse_eg,
maybe_open_nursery,
)
from .ipc import (
Channel,
@ -173,9 +174,11 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_service_n: Nursery|None = None
# nursery placeholders filled in by `async_main()`,
# - after fork for subactors.
# - during boot for the root actor.
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
_ipc_server: _server.IPCServer|None = None
@property
@ -1009,12 +1012,46 @@ class Actor:
the RPC service nursery.
'''
assert self._service_n
self._service_n.start_soon(
actor_repr: str = _pformat.nest_from_op(
input_op='>c(',
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
self.cancel,
None, # self cancel all rpc tasks
)
# schedule a root-tn canceller task once the service tn
# is fully shutdown.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@ -1119,8 +1156,8 @@ class Actor:
await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently
if self._service_n:
self._service_n.cancel_scope.cancel()
if self._service_tn:
self._service_tn.cancel_scope.cancel()
log_meth(msg)
self._cancel_complete.set()
@ -1257,7 +1294,7 @@ class Actor:
'''
Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside
`._service_n`) when `parent_chan=None`.
`._service_tn`) when `parent_chan=None`.
'''
tasks: dict = self._rpc_tasks
@ -1469,40 +1506,56 @@ async def async_main(
accept_addrs.append(addr.unwrap())
assert accept_addrs
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
ya_root_tn: bool = bool(actor._root_tn)
ya_service_tn: bool = bool(actor._service_tn)
# NOTE, a top-most "root" task-nursery ensures the channel
# with the immediate parent is kept alive as a resilient
# service until cancellation steps have (mostly) occurred in
# a deterministic way.
root_tn: trio.Nursery
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
maybe_open_nursery(
nursery=actor._root_tn,
) as root_tn,
# trio.open_nursery() as root_tn,
):
# actor._root_n = root_tn
# assert actor._root_n
if ya_root_tn:
assert root_tn is actor._root_tn
else:
actor._root_tn = root_tn
ipc_server: _server.IPCServer
async with (
collapse_eg(),
trio.open_nursery() as service_nursery,
maybe_open_nursery(
nursery=actor._service_tn,
) as service_tn,
# trio.open_nursery() as service_tn,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
parent_tn=service_tn,
stream_handler_tn=service_tn,
) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
):
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_tn = service_tn
actor._ipc_server = ipc_server
assert (
actor._service_n
actor._service_tn
and (
actor._service_n
actor._service_tn
is
actor._ipc_server._parent_tn
is
@ -1537,7 +1590,7 @@ async def async_main(
try:
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
stream_handler_nursery=service_tn,
)
log.runtime(
f'Booted IPC server\n'
@ -1545,7 +1598,7 @@ async def async_main(
)
assert (
(eps[0].listen_tn)
is not service_nursery
is not service_tn
)
except OSError as oserr:
@ -1707,7 +1760,7 @@ async def async_main(
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# _service_tn to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?

View File

@ -481,12 +481,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# actor._service_tn.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_n` so that the
# subactor's global `Actor._service_tn` so that the
# lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their
# application code and when they finally exit the
@ -510,7 +510,7 @@ async def _pause(
f'|_{task}\n'
)
with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_n.start(
req_ctx: Context = await actor._service_tn.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
@ -544,7 +544,7 @@ async def _pause(
_repl_fail_report = None
# when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn
# `Actor._service_tn` might get closed before we can spawn
# the request task, so just ignore expected RTE.
elif (
isinstance(pause_err, RuntimeError)
@ -989,7 +989,7 @@ def pause_from_sync(
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial(
actor._service_n.start,
actor._service_tn.start,
partial(
_pause_from_bg_root_thread,
behalf_of_thread=thread,

View File

@ -204,7 +204,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_n: Optional[trio.Nursery] = None
service_tn: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -294,15 +294,15 @@ async def maybe_open_context(
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_n = tn
service_tn = tn
else:
service_n: trio.Nursery = current_actor()._service_n
service_tn: trio.Nursery = current_actor()._service_tn
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
# service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
# service_tn: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
# _Cache.service_tn = service_tn
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
@ -324,8 +324,8 @@ async def maybe_open_context(
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
yielded: Any = await service_n.start(
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
_Cache.run_ctx,
mngr,
ctx_key,