Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 3d12a7e005 Flip `infected_asyncio` status msg to `.runtime()` 2024-06-18 18:14:58 -04:00
Tyler Goodlet 9292d73b40 Avoid actor-nursery-exit warns on registrees
Since a local-actor-nursery-parented subactor might also use the root as
its registry, we need to avoid warning when short lived IPC `Channel`
connections establish and then disconnect (quickly, bc the apparently
the subactor isn't re-using an already cached parente-peer<->child conn
as you'd expect efficiency..) since such cases currently considered
normal operation of our super shoddy/naive "discovery sys" XD

As such, (un)guard the whole local-actor-nursery OR channel-draining
waiting blocks with the additional `or Actor._cancel_called` branch
since really we should also be waiting on the parent nurse to exit (at
least, for sure and always) when the local `Actor` indeed has been
"globally" cancelled-called. Further add separate timeout warnings for
channel-draining vs. local-actor-nursery-exit waiting since they are
technically orthogonal cases (at least, afaik).

Also,
- adjust the `Actor._stream_handler()` connection status log-emit to
  `.runtime()`, especially to reduce noise around the aforementioned
  ephemeral registree connection-requests.
- if we do wait on a local actor-nurse to exit, report its `._children`
  table (which should help figure out going forward how useful the
  warning is, if at all).
2024-06-18 16:10:36 -04:00
Tyler Goodlet 83d69fe395 Change `_Cache` reuse emit to `.runtime()` 2024-06-18 14:40:26 -04:00
3 changed files with 105 additions and 44 deletions

View File

@ -444,7 +444,7 @@ class Actor:
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(
log.runtime(
con_status
+
' -> But failed to handshake? Ignoring..\n'
@ -520,24 +520,50 @@ class Actor:
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if local_nursery:
if chan._cancel_called:
log.cancel(
'Waiting on cancel request to peer\n'
f'`Portal.cancel_actor()` => {chan.uid}\n'
)
if (
local_nursery
and (
self._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer\n'
f'`Portal.cancel_actor()` => {chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) has a message is sent to
# the peer likely by this actor (which is now in
# a cancelled condition) when the local runtime here is
# now cancelled while (presumably) in the middle of msg
# loop processing.
with trio.move_on_after(0.5) as cs:
cs.shield = True
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(0.5) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
@ -554,10 +580,7 @@ class Actor:
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
@ -569,31 +592,62 @@ class Actor:
cid,
msg,
)
# NOTE: when no call to `open_root_actor()` was
# made, we implicitly make that call inside
# the first `.open_nursery()`, in this case we
# can assume that we are the root actor and do
# not have to wait for the nursery-enterer to
# exit before shutting down the actor runtime.
#
# see matching note inside `._supervise.open_nursery()`
if not local_nursery._implicit_runtime_started:
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
await local_nursery.exited.wait()
if (
cs.cancelled_caught
and not local_nursery._implicit_runtime_started
):
if drain_cs.cancelled_caught:
log.warning(
'Failed to exit local actor nursery?\n'
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx._debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
# await _debug.pause()
with trio.move_on_after(0.5) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
log.warning(
'Timed out waiting on local actor-nursery to exit?\n'
f'{local_nursery}\n'
f' |_{pformat(local_nursery._children)}\n'
)
# await _debug.pause()
if disconnected:
# if the transport died and this actor is still

View File

@ -577,14 +577,18 @@ def run_as_asyncio_guest(
log.runtime(f"trio_main finished: {main_outcome!r}")
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
log.runtime(
'Infecting `asyncio`-process with a `trio` guest-run of\n\n'
f'{trio_main!r}\n\n'
f'{trio_done_callback}\n'
)
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
# ``.unwrap()`` will raise here on error
# NOTE `.unwrap()` will raise on error
return (await trio_done_fut).unwrap()
# might as well if it's installed.

View File

@ -271,8 +271,11 @@ async def maybe_open_context(
yield False, yielded
else:
log.info(f'Reusing _Cached resource for {ctx_key}')
_Cache.users += 1
log.runtime(
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield True, yielded