forked from goodboy/tractor
Avoid actor-nursery-exit warns on registrees
Since a local-actor-nursery-parented subactor might also use the root as its registry, we need to avoid warning when short lived IPC `Channel` connections establish and then disconnect (quickly, bc the apparently the subactor isn't re-using an already cached parente-peer<->child conn as you'd expect efficiency..) since such cases currently considered normal operation of our super shoddy/naive "discovery sys" XD As such, (un)guard the whole local-actor-nursery OR channel-draining waiting blocks with the additional `or Actor._cancel_called` branch since really we should also be waiting on the parent nurse to exit (at least, for sure and always) when the local `Actor` indeed has been "globally" cancelled-called. Further add separate timeout warnings for channel-draining vs. local-actor-nursery-exit waiting since they are technically orthogonal cases (at least, afaik). Also, - adjust the `Actor._stream_handler()` connection status log-emit to `.runtime()`, especially to reduce noise around the aforementioned ephemeral registree connection-requests. - if we do wait on a local actor-nurse to exit, report its `._children` table (which should help figure out going forward how useful the warning is, if at all).runtime_to_msgspec
parent
83d69fe395
commit
9292d73b40
|
@ -444,7 +444,7 @@ class Actor:
|
||||||
# inside ``open_root_actor()`` where there is a check for
|
# inside ``open_root_actor()`` where there is a check for
|
||||||
# a bound listener on the "arbiter" addr. the reset will be
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
# because the handshake was never meant took place.
|
# because the handshake was never meant took place.
|
||||||
log.warning(
|
log.runtime(
|
||||||
con_status
|
con_status
|
||||||
+
|
+
|
||||||
' -> But failed to handshake? Ignoring..\n'
|
' -> But failed to handshake? Ignoring..\n'
|
||||||
|
@ -520,24 +520,50 @@ class Actor:
|
||||||
# the peer was cancelled we try to wait for them
|
# the peer was cancelled we try to wait for them
|
||||||
# to tear down their side of the connection before
|
# to tear down their side of the connection before
|
||||||
# moving on with closing our own side.
|
# moving on with closing our own side.
|
||||||
if local_nursery:
|
if (
|
||||||
if chan._cancel_called:
|
local_nursery
|
||||||
log.cancel(
|
and (
|
||||||
'Waiting on cancel request to peer\n'
|
self._cancel_called
|
||||||
f'`Portal.cancel_actor()` => {chan.uid}\n'
|
or
|
||||||
)
|
chan._cancel_called
|
||||||
|
)
|
||||||
|
#
|
||||||
|
# ^-TODO-^ along with this is there another condition
|
||||||
|
# that we should filter with to avoid entering this
|
||||||
|
# waiting block needlessly?
|
||||||
|
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||||
|
# only if the `._children` table is empty or has
|
||||||
|
# only `Portal`s with .chan._cancel_called ==
|
||||||
|
# True` as per what we had below; the MAIN DIFF
|
||||||
|
# BEING that just bc one `Portal.cancel_actor()`
|
||||||
|
# was called, doesn't mean the whole actor-nurse
|
||||||
|
# is gonna exit any time soon right!?
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# all(chan._cancel_called for chan in chans)
|
||||||
|
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Waiting on cancel request to peer\n'
|
||||||
|
f'`Portal.cancel_actor()` => {chan.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
# underlying transport protocol) to close from the
|
# underlying transport protocol) to close from the
|
||||||
# remote peer side since we presume that any channel
|
# remote peer side since we presume that any channel
|
||||||
# which is mapped to a sub-actor (i.e. it's managed by
|
# which is mapped to a sub-actor (i.e. it's managed
|
||||||
# one of our local nurseries) has a message is sent to
|
# by local actor-nursery) has a message that is sent
|
||||||
# the peer likely by this actor (which is now in
|
# to the peer likely by this actor (which may be in
|
||||||
# a cancelled condition) when the local runtime here is
|
# a shutdown sequence due to cancellation) when the
|
||||||
# now cancelled while (presumably) in the middle of msg
|
# local runtime here is now cancelled while
|
||||||
# loop processing.
|
# (presumably) in the middle of msg loop processing.
|
||||||
with trio.move_on_after(0.5) as cs:
|
chan_info: str = (
|
||||||
cs.shield = True
|
f'{chan.uid}\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
f' |_{chan.transport}\n\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(0.5) as drain_cs:
|
||||||
|
drain_cs.shield = True
|
||||||
|
|
||||||
# attempt to wait for the far end to close the
|
# attempt to wait for the far end to close the
|
||||||
# channel and bail after timeout (a 2-generals
|
# channel and bail after timeout (a 2-generals
|
||||||
|
@ -554,10 +580,7 @@ class Actor:
|
||||||
# TODO: factor this into a helper?
|
# TODO: factor this into a helper?
|
||||||
log.warning(
|
log.warning(
|
||||||
'Draining msg from disconnected peer\n'
|
'Draining msg from disconnected peer\n'
|
||||||
f'{chan.uid}\n'
|
f'{chan_info}'
|
||||||
f'|_{chan}\n'
|
|
||||||
f' |_{chan.transport}\n\n'
|
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
# cid: str|None = msg.get('cid')
|
# cid: str|None = msg.get('cid')
|
||||||
|
@ -569,31 +592,62 @@ class Actor:
|
||||||
cid,
|
cid,
|
||||||
msg,
|
msg,
|
||||||
)
|
)
|
||||||
|
if drain_cs.cancelled_caught:
|
||||||
# NOTE: when no call to `open_root_actor()` was
|
|
||||||
# made, we implicitly make that call inside
|
|
||||||
# the first `.open_nursery()`, in this case we
|
|
||||||
# can assume that we are the root actor and do
|
|
||||||
# not have to wait for the nursery-enterer to
|
|
||||||
# exit before shutting down the actor runtime.
|
|
||||||
#
|
|
||||||
# see matching note inside `._supervise.open_nursery()`
|
|
||||||
if not local_nursery._implicit_runtime_started:
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
await local_nursery.exited.wait()
|
|
||||||
|
|
||||||
if (
|
|
||||||
cs.cancelled_caught
|
|
||||||
and not local_nursery._implicit_runtime_started
|
|
||||||
):
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'Failed to exit local actor nursery?\n'
|
'Timed out waiting on IPC transport channel to drain?\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX when no explicit call to
|
||||||
|
# `open_root_actor()` was made by the application
|
||||||
|
# (normally we implicitly make that call inside
|
||||||
|
# the first `.open_nursery()` in root-actor
|
||||||
|
# user/app code), we can assume that either we
|
||||||
|
# are NOT the root actor or are root but the
|
||||||
|
# runtime was started manually. and thus DO have
|
||||||
|
# to wait for the nursery-enterer to exit before
|
||||||
|
# shutting down the local runtime to avoid
|
||||||
|
# clobbering any ongoing subactor
|
||||||
|
# teardown/debugging/graceful-cancel.
|
||||||
|
#
|
||||||
|
# see matching note inside `._supervise.open_nursery()`
|
||||||
|
#
|
||||||
|
# TODO: should we have a separate cs + timeout
|
||||||
|
# block here?
|
||||||
|
if (
|
||||||
|
# XXX SO either,
|
||||||
|
# - not root OR,
|
||||||
|
# - is root but `open_root_actor()` was
|
||||||
|
# entered manually (in which case we do
|
||||||
|
# the equiv wait there using the
|
||||||
|
# `devx._debug` sub-sys APIs).
|
||||||
|
not local_nursery._implicit_runtime_started
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on local actor nursery to exit..\n'
|
||||||
f'|_{local_nursery}\n'
|
f'|_{local_nursery}\n'
|
||||||
)
|
)
|
||||||
# await _debug.pause()
|
with trio.move_on_after(0.5) as an_exit_cs:
|
||||||
|
an_exit_cs.shield = True
|
||||||
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# TODO: currently this is always triggering for every
|
||||||
|
# sub-daemon spawned from the `piker.services._mngr`?
|
||||||
|
# -[ ] how do we ensure that the IPC is supposed to
|
||||||
|
# be long lived and isn't just a register?
|
||||||
|
# |_ in the register case how can we signal that the
|
||||||
|
# ephemeral msg loop was intentional?
|
||||||
|
if (
|
||||||
|
# not local_nursery._implicit_runtime_started
|
||||||
|
# and
|
||||||
|
an_exit_cs.cancelled_caught
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
|
f'{local_nursery}\n'
|
||||||
|
f' |_{pformat(local_nursery._children)}\n'
|
||||||
|
)
|
||||||
|
# await _debug.pause()
|
||||||
|
|
||||||
if disconnected:
|
if disconnected:
|
||||||
# if the transport died and this actor is still
|
# if the transport died and this actor is still
|
||||||
|
|
Loading…
Reference in New Issue