Tweak `Actor` cancel method signatures
Besides improving a bunch more log msg contents similarly as before this changes the cancel method signatures slightly with different arg names: for `.cancel()`: - instead of `requesting_uid: str` take in a `req_chan: Channel` since we can always just read its `.uid: tuple` for logging and further we can then offer the `chan=None` case indicating a "self cancel" (since there's no "requesting channel"). - the semantics of "requesting" here better indicate that the IPC connection is an IPC peer and further (eventually) will allow permission checking against given peers for cancellation requests. - when `chan==None` we also define a meth-internal `requester_type: str` differently for logging content :) - add much more detailed `.cancel()` content around the requester, its type, and any debugger related locking steps. for `._cancel_task()`: - change the `chan` arg to `parent_chan: Channel` since "parent" correctly indicates that the channel is the parent of the locally spawned rpc task to cancel; in fact no other chan should be able to cancel tasks parented/spawned by other channels obvi! - also add more extensive meth-internal `.cancel()` logging with a #TODO around showing only the "relevant/lasest" `Context` state vars in such logging content. for `.cancel_rpc_tasks()`: - shorten `requesting_uid` -> `req_uid`. - add `parent_chan: Channel` to be similar as above in `._cancel_task()` (since it's internally delegated to anyway) which replaces the prior `only_chan` and use it to filter to only tasks spawned by this channel (thus as their "parent") as before. - instead of `if tasks:` to enter, invert and `return` early on `if not tasks`, for less indentation B) - add WIP str-repr format (for `.cancel()` emissions) to show a multi-address (maddr) + task func (via the new `Context._nsf`) and report all cancel task targets with it a "tree"; include #TODO to finalize and implement some utils for all this! To match ensure we adjust `process_messages()` self/`Actor` cancel handling blocks to provide the new `kwargs` (now with `dict`-merge syntax) to `._invoke()`.modden_spawn_from_client_req
parent
ce1bcf6d36
commit
5a09ccf459
|
@ -322,7 +322,7 @@ async def _invoke(
|
|||
else f'result: {ctx._result}'
|
||||
)
|
||||
log.cancel(
|
||||
f'IPC context terminated with final {res_str}\n'
|
||||
f'IPC context terminated with final {res_str}\n\n'
|
||||
f'|_{pformat(ctx)}\n'
|
||||
)
|
||||
|
||||
|
@ -1022,14 +1022,14 @@ class Actor:
|
|||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
f'Peer actor IPC broke but proc is alive?\n'
|
||||
f'uid: {uid}\n'
|
||||
f'|_{proc}\n'
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x @{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
log.runtime(
|
||||
f'Disconnected IPC channel:\n'
|
||||
f'uid: {chan.uid}\n'
|
||||
|
@ -1177,8 +1177,12 @@ class Actor:
|
|||
ctx: Context = self._contexts[(uid, cid)]
|
||||
except KeyError:
|
||||
log.warning(
|
||||
f'Ignoring msg from [no-longer/un]known context {uid}:'
|
||||
f'\n{msg}')
|
||||
'Ignoring invalid IPC ctx msg!\n\n'
|
||||
f'<= sender: {uid}\n'
|
||||
f'=> cid: {cid}\n\n'
|
||||
|
||||
f'{msg}\n'
|
||||
)
|
||||
return
|
||||
|
||||
return await ctx._deliver_msg(msg)
|
||||
|
@ -1381,9 +1385,12 @@ class Actor:
|
|||
|
||||
except OSError: # failed to connect
|
||||
log.warning(
|
||||
f"Failed to connect to parent @ {parent_addr},"
|
||||
" closing server")
|
||||
await self.cancel(requesting_uid=self.uid)
|
||||
f'Failed to connect to parent!?\n\n'
|
||||
'Closing IPC [TCP] transport server to\n'
|
||||
f'{parent_addr}\n'
|
||||
f'|_{self}\n\n'
|
||||
)
|
||||
await self.cancel(chan=None) # self cancel
|
||||
raise
|
||||
|
||||
async def _serve_forever(
|
||||
|
@ -1451,29 +1458,53 @@ class Actor:
|
|||
assert self._service_n
|
||||
self._service_n.start_soon(
|
||||
self.cancel,
|
||||
self.uid,
|
||||
None, # self cancel all rpc tasks
|
||||
)
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
requesting_uid: tuple[str, str],
|
||||
|
||||
# chan whose lifetime limits the lifetime of its remotely
|
||||
# requested and locally spawned RPC tasks - similar to the
|
||||
# supervision semantics of a nursery wherein the actual
|
||||
# implementation does start all such tasks in
|
||||
# a sub-nursery.
|
||||
req_chan: Channel|None,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Cancel this actor's runtime.
|
||||
Cancel this actor's runtime, eventually resulting in
|
||||
the exit its containing process.
|
||||
|
||||
The "deterministic" teardown sequence in order is:
|
||||
- cancel all ongoing rpc tasks by cancel scope
|
||||
- cancel the channel server to prevent new inbound
|
||||
connections
|
||||
- cancel the "service" nursery reponsible for
|
||||
spawning new rpc tasks
|
||||
- return control the parent channel message loop
|
||||
The ideal "deterministic" teardown sequence in order is:
|
||||
- cancel all ongoing rpc tasks by cancel scope
|
||||
- cancel the channel server to prevent new inbound
|
||||
connections
|
||||
- cancel the "service" nursery reponsible for
|
||||
spawning new rpc tasks
|
||||
- return control the parent channel message loop
|
||||
|
||||
'''
|
||||
log.cancel(
|
||||
f'{self.uid} requested to cancel by:\n'
|
||||
f'{requesting_uid}'
|
||||
(
|
||||
requesting_uid,
|
||||
requester_type,
|
||||
req_chan,
|
||||
|
||||
) = (
|
||||
req_chan.uid,
|
||||
'peer',
|
||||
req_chan,
|
||||
|
||||
) if req_chan else (
|
||||
|
||||
# a self cancel of ALL rpc tasks
|
||||
self.uid,
|
||||
'self',
|
||||
self
|
||||
)
|
||||
msg: str = (
|
||||
f'`Actor.cancel()` request from {requester_type}:\n'
|
||||
f'<= {requesting_uid}\n'
|
||||
)
|
||||
|
||||
# TODO: what happens here when we self-cancel tho?
|
||||
|
@ -1487,12 +1518,16 @@ class Actor:
|
|||
# with the root actor in this tree
|
||||
dbcs = _debug.Lock._debugger_request_cs
|
||||
if dbcs is not None:
|
||||
log.cancel("Cancelling active debugger request")
|
||||
msg += (
|
||||
'>> Cancelling active debugger request..\n'
|
||||
f'|_{_debug.Lock}\n'
|
||||
)
|
||||
dbcs.cancel()
|
||||
|
||||
# kill all ongoing tasks
|
||||
# self-cancel **all** ongoing RPC tasks
|
||||
await self.cancel_rpc_tasks(
|
||||
requesting_uid=requesting_uid,
|
||||
req_uid=requesting_uid,
|
||||
parent_chan=None,
|
||||
)
|
||||
|
||||
# stop channel server
|
||||
|
@ -1501,13 +1536,14 @@ class Actor:
|
|||
await self._server_down.wait()
|
||||
else:
|
||||
log.warning(
|
||||
f'{self.uid} was likely cancelled before it started')
|
||||
'Transport[TCP] server was cancelled start?'
|
||||
)
|
||||
|
||||
# cancel all rpc tasks permanently
|
||||
if self._service_n:
|
||||
self._service_n.cancel_scope.cancel()
|
||||
|
||||
log.cancel(f"{self.uid} called `Actor.cancel()`")
|
||||
log.cancel(msg)
|
||||
self._cancel_complete.set()
|
||||
return True
|
||||
|
||||
|
@ -1522,7 +1558,7 @@ class Actor:
|
|||
async def _cancel_task(
|
||||
self,
|
||||
cid: str,
|
||||
chan: Channel,
|
||||
parent_chan: Channel,
|
||||
requesting_uid: tuple[str, str] | None = None,
|
||||
|
||||
) -> bool:
|
||||
|
@ -1534,13 +1570,25 @@ class Actor:
|
|||
in the signature (for now).
|
||||
|
||||
'''
|
||||
# right now this is only implicitly called by
|
||||
# this ctx based lookup ensures the requested task to
|
||||
# be cancelled was indeed spawned by a request from
|
||||
# this channel
|
||||
ctx: Context
|
||||
func: Callable
|
||||
is_complete: trio.Event
|
||||
|
||||
# NOTE: right now this is only implicitly called by
|
||||
# streaming IPC but it should be called
|
||||
# to cancel any remotely spawned task
|
||||
try:
|
||||
# this ctx based lookup ensures the requested task to
|
||||
# be cancelled was indeed spawned by a request from this channel
|
||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||
(
|
||||
ctx,
|
||||
func,
|
||||
is_complete,
|
||||
) = self._rpc_tasks[(
|
||||
parent_chan,
|
||||
cid,
|
||||
)]
|
||||
scope: CancelScope = ctx._scope
|
||||
|
||||
except KeyError:
|
||||
|
@ -1551,17 +1599,28 @@ class Actor:
|
|||
# - callee errors prior to cancel req.
|
||||
log.cancel(
|
||||
'Cancel request invalid, RPC task already completed?\n'
|
||||
f'<= canceller: {requesting_uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
|
||||
f'=> ctx id: {cid}\n'
|
||||
f'<= canceller: {requesting_uid}\n\n'
|
||||
f'=>{parent_chan}\n'
|
||||
f' |_ctx-id: {cid}\n'
|
||||
)
|
||||
return True
|
||||
|
||||
log.cancel(
|
||||
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
'Cancel request for RPC task\n'
|
||||
f'<= canceller: {requesting_uid}\n\n'
|
||||
|
||||
# TODO: better ascii repr for "supervisor" like
|
||||
# a nursery or context scope?
|
||||
f'=> ipc-parent: {parent_chan}\n'
|
||||
# TODO: simplified `Context.__repr__()` fields output
|
||||
# shows only application state-related stuff like,
|
||||
# - ._stream
|
||||
# - .closed
|
||||
# - .started_called
|
||||
# - .. etc.
|
||||
f' |_ctx: {cid}\n'
|
||||
f' >> {ctx._nsf}()\n'
|
||||
)
|
||||
if (
|
||||
ctx._canceller is None
|
||||
and requesting_uid
|
||||
|
@ -1571,6 +1630,7 @@ class Actor:
|
|||
# don't allow cancelling this function mid-execution
|
||||
# (is this necessary?)
|
||||
if func is self._cancel_task:
|
||||
log.error('Do not cancel a cancel!?')
|
||||
return True
|
||||
|
||||
# TODO: shouldn't we eventually be calling ``Context.cancel()``
|
||||
|
@ -1580,23 +1640,29 @@ class Actor:
|
|||
scope.cancel()
|
||||
|
||||
# wait for _invoke to mark the task complete
|
||||
flow_info: str = (
|
||||
f'<= canceller: {requesting_uid}\n'
|
||||
f'=> ipc-parent: {parent_chan}\n'
|
||||
f' |_{ctx}\n'
|
||||
)
|
||||
log.runtime(
|
||||
'Waiting on task to cancel:\n'
|
||||
f'cid: {cid}\nfunc: {func}\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
'Waiting on RPC task to cancel\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
await is_complete.wait()
|
||||
|
||||
log.runtime(
|
||||
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
|
||||
f'Sucessfully cancelled RPC task\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
return True
|
||||
|
||||
async def cancel_rpc_tasks(
|
||||
self,
|
||||
only_chan: Channel | None = None,
|
||||
requesting_uid: tuple[str, str] | None = None,
|
||||
req_uid: tuple[str, str],
|
||||
|
||||
# NOTE: when None is passed we cancel **all** rpc
|
||||
# tasks running in this actor!
|
||||
parent_chan: Channel|None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -1605,38 +1671,76 @@ class Actor:
|
|||
|
||||
'''
|
||||
tasks: dict = self._rpc_tasks
|
||||
if tasks:
|
||||
tasks_str: str = ''
|
||||
for (ctx, func, _) in tasks.values():
|
||||
tasks_str += (
|
||||
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling all {len(tasks)} rpc tasks:\n'
|
||||
f'{tasks_str}'
|
||||
if not tasks:
|
||||
log.warning(
|
||||
'Actor has no cancellable RPC tasks?\n'
|
||||
f'<= cancel requester: {req_uid}\n'
|
||||
f'=> {self}\n\n'
|
||||
)
|
||||
for (
|
||||
(chan, cid),
|
||||
(ctx, func, is_complete),
|
||||
) in tasks.copy().items():
|
||||
if only_chan is not None:
|
||||
if only_chan != chan:
|
||||
continue
|
||||
return
|
||||
|
||||
# TODO: this should really done in a nursery batch
|
||||
if func != self._cancel_task:
|
||||
await self._cancel_task(
|
||||
cid,
|
||||
chan,
|
||||
requesting_uid=requesting_uid,
|
||||
)
|
||||
# TODO: seriously factor this into some helper funcs XD
|
||||
tasks_str: str = ''
|
||||
for (ctx, func, _) in tasks.values():
|
||||
|
||||
log.cancel(
|
||||
'Waiting for remaining rpc tasks to complete:\n'
|
||||
f'{tasks}'
|
||||
# TODO: std repr of all primitives in
|
||||
# a hierarchical tree format, since we can!!
|
||||
# like => repr for funcs/addrs/msg-typing:
|
||||
#
|
||||
# -[ ] use a proper utf8 "arm" like
|
||||
# `stackscope` has!
|
||||
# -[ ] for typed msging, show the
|
||||
# py-type-annot style?
|
||||
# - maybe auto-gen via `inspect` / `typing` type-sig:
|
||||
# https://stackoverflow.com/a/57110117
|
||||
# => see ex. code pasted into `.msg.types`
|
||||
#
|
||||
# -[ ] proper .maddr() for IPC primitives?
|
||||
# - `Channel.maddr() -> str:` obvi!
|
||||
# - `Context.maddr() -> str:`
|
||||
tasks_str += (
|
||||
f' |_@ /ipv4/tcp/cid="{ctx.cid[-16:]} .."\n'
|
||||
f' |>> {ctx._nsf}() -> dict:\n'
|
||||
)
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling all {len(tasks)} rpc tasks:\n\n'
|
||||
f'<= .cancel() from {req_uid}\n'
|
||||
f'{self}\n'
|
||||
f'{tasks_str}'
|
||||
)
|
||||
for (
|
||||
(task_caller_chan, cid),
|
||||
(ctx, func, is_complete),
|
||||
) in tasks.copy().items():
|
||||
|
||||
if (
|
||||
# maybe filter to specific IPC channel?
|
||||
(parent_chan
|
||||
and
|
||||
task_caller_chan != parent_chan)
|
||||
|
||||
# never "cancel-a-cancel" XD
|
||||
or (func == self._cancel_task)
|
||||
):
|
||||
continue
|
||||
|
||||
# if func == self._cancel_task:
|
||||
# continue
|
||||
|
||||
# TODO: this maybe block on the task cancellation
|
||||
# and so should really done in a nursery batch?
|
||||
await self._cancel_task(
|
||||
cid,
|
||||
task_caller_chan,
|
||||
requesting_uid=req_uid,
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
'Waiting for remaining rpc tasks to complete\n'
|
||||
f'|_{tasks}'
|
||||
)
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
||||
def cancel_server(self) -> None:
|
||||
'''
|
||||
|
@ -2092,10 +2196,11 @@ async def process_messages(
|
|||
f'=> {ns}.{funcname}({kwargs})\n'
|
||||
)
|
||||
if ns == 'self':
|
||||
uid: tuple = chan.uid
|
||||
if funcname == 'cancel':
|
||||
func: Callable = actor.cancel
|
||||
kwargs['requesting_uid'] = uid
|
||||
kwargs |= {
|
||||
'req_chan': chan,
|
||||
}
|
||||
|
||||
# don't start entire actor runtime cancellation
|
||||
# if this actor is currently in debug mode!
|
||||
|
@ -2109,11 +2214,6 @@ async def process_messages(
|
|||
# and immediately start the core runtime
|
||||
# machinery shutdown!
|
||||
with CancelScope(shield=True):
|
||||
log.cancel(
|
||||
f'Cancel request for `Actor` runtime\n'
|
||||
f'<= canceller: {uid}\n'
|
||||
# f'=> uid: {actor.uid}\n'
|
||||
)
|
||||
await _invoke(
|
||||
actor,
|
||||
cid,
|
||||
|
@ -2123,25 +2223,32 @@ async def process_messages(
|
|||
is_rpc=False,
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling IPC msg-loop with {chan.uid}'
|
||||
log.runtime(
|
||||
'Cancelling IPC transport msg-loop with peer:\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
loop_cs.cancel()
|
||||
break
|
||||
|
||||
if funcname == '_cancel_task':
|
||||
func = actor._cancel_task
|
||||
func: Callable = actor._cancel_task
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
# with CancelScope(shield=True):
|
||||
kwargs['chan'] = chan
|
||||
target_cid = kwargs['cid']
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
target_cid: str = kwargs['cid']
|
||||
kwargs |= {
|
||||
# NOTE: ONLY the rpc-task-owning
|
||||
# parent IPC channel should be able to
|
||||
# cancel it!
|
||||
'parent_chan': chan,
|
||||
'requesting_uid': chan.uid,
|
||||
}
|
||||
log.cancel(
|
||||
f'Rx task cancel request\n'
|
||||
f'<= canceller: {chan.uid}\n'
|
||||
f'=> uid: {actor.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
f'=> {actor}\n'
|
||||
f' |_cid: {target_cid}\n'
|
||||
)
|
||||
try:
|
||||
|
@ -2154,8 +2261,13 @@ async def process_messages(
|
|||
is_rpc=False,
|
||||
)
|
||||
except BaseException:
|
||||
log.exception("failed to cancel task?")
|
||||
|
||||
log.exception(
|
||||
'Failed to cancel task?\n'
|
||||
f'<= canceller: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
f'=> {actor}\n'
|
||||
f' |_cid: {target_cid}\n'
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# normally registry methods, eg.
|
||||
|
@ -2174,9 +2286,25 @@ async def process_messages(
|
|||
await chan.send(err_msg)
|
||||
continue
|
||||
|
||||
# spin up a task for the requested function
|
||||
log.runtime(f"Spawning task for {func}")
|
||||
assert actor._service_n
|
||||
# schedule a task for the requested RPC function
|
||||
# in the actor's main "service nursery".
|
||||
# TODO: possibly a service-tn per IPC channel for
|
||||
# supervision isolation? would avoid having to
|
||||
# manage RPC tasks individually in `._rpc_tasks`
|
||||
# table?
|
||||
log.runtime(
|
||||
f'Spawning task for RPC request\n'
|
||||
f'<= caller: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
# TODO: maddr style repr?
|
||||
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
|
||||
# f'cid="{cid[-16:]} .."\n\n'
|
||||
|
||||
f'=> {actor}\n'
|
||||
f' |_cid: {cid}\n'
|
||||
f' |>> {func}()\n'
|
||||
)
|
||||
assert actor._service_n # wait why? do it at top?
|
||||
try:
|
||||
ctx: Context = await actor._service_n.start(
|
||||
partial(
|
||||
|
@ -2234,7 +2362,13 @@ async def process_messages(
|
|||
log.runtime(
|
||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||
)
|
||||
await actor.cancel_rpc_tasks(chan)
|
||||
await actor.cancel_rpc_tasks(
|
||||
req_uid=actor.uid,
|
||||
# a "self cancel" in terms of the lifetime of the
|
||||
# IPC connection which is presumed to be the
|
||||
# source of any requests for spawned tasks.
|
||||
parent_chan=chan,
|
||||
)
|
||||
|
||||
except (
|
||||
TransportClosed,
|
||||
|
|
Loading…
Reference in New Issue