forked from goodboy/tractor
1
0
Fork 0

Improved log msg formatting in core

As part of solving some final edge cases todo with inter-peer remote
cancellation (particularly a remote cancel from a separate actor
tree-client hanging on the request side in `modden`..) I needed less
dense, more line-delimited log msg formats when understanding ipc
channel and context cancels from console logging; this adds a ton of
that to:
- `._invoke()` which now does,
  - better formatting of `Context`-task info as multi-line
    `'<field>: <value>\n'` messages,
  - use of `trio.Task` (from `.lowlevel.current_task()` for full
    rpc-func namespace-path info,
  - better "msg flow annotations" with `<=` for understanding
    `ContextCancelled` flow.
- `Actor._stream_handler()` where in we break down IPC peers reporting
  better as multi-line `|_<Channel>` log msgs instead of all jammed on
  one line..
- `._ipc.Channel.send()` use `pformat()` for repr of packet.

Also tweak some optional deps imports for debug mode:
- add `maybe_import_gb()` for attempting to import `greenback`.
- maybe enable `stackscope` tree pprinter on `SIGUSR1` if installed.

Add a further stale-debugger-lock guard before removal:
- read the `._debug.Lock.global_actor_in_debug: tuple` uid and possibly
  `maybe_wait_for_debugger()` when the child-user is known to have
  a live process in our tree.
- only cancel `Lock._root_local_task_cs_in_debug: CancelScope` when
  the disconnected channel maps to the `Lock.global_actor_in_debug`,
  though not sure this is correct yet?

Started adding missing type annots in sections that were modified.
modden_spawn_from_client_req
Tyler Goodlet 2024-02-19 12:25:08 -05:00
parent 7f29fd8dcf
commit 8ce26d692f
2 changed files with 275 additions and 94 deletions

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
"""
from __future__ import annotations
import platform
import struct
import typing
import platform
from pprint import pformat
from collections.abc import (
AsyncGenerator,
AsyncIterator,
)
import typing
from typing import (
Any,
runtime_checkable,
@ -370,7 +371,10 @@ class Channel:
async def send(self, item: Any) -> None:
log.transport(f"send `{item}`") # type: ignore
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(item)}\n'
) # type: ignore
assert self.msgstream
await self.msgstream.send(item)

View File

@ -48,6 +48,10 @@ import trio
from trio import (
CancelScope,
)
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import (
Nursery,
TaskStatus,
@ -67,7 +71,11 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
from .devx import _debug
from .devx import (
# pause,
maybe_wait_for_debugger,
_debug,
)
from ._discovery import get_registry
from ._portal import Portal
from . import _state
@ -80,6 +88,26 @@ if TYPE_CHECKING:
log = get_logger('tractor')
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
_gb_mod = False
async def _invoke(
@ -106,17 +134,11 @@ async def _invoke(
failed_resp: bool = False
if _state.debug_mode():
try:
import greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
await maybe_import_gb()
# possibly a traceback (not sure what typing is for this..)
tb = None
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
# tb = None
cancel_scope = CancelScope()
# activated cancel scope ref
@ -237,15 +259,27 @@ async def _invoke(
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
# a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
await chan.send({
'functype': 'context',
'cid': cid
})
try:
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
# res: Any = await coro
res = await coro
# deliver final result to caller side.
await chan.send({
'return': res,
'cid': cid
@ -279,11 +313,12 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime(
f'Context entrypoint {func} was terminated:\n'
f'{ctx}'
log.cancel(
f'Context task was terminated:\n'
f'func: {func}\n'
f'ctx: {pformat(ctx)}'
)
if ctx.cancelled_caught:
@ -295,13 +330,14 @@ async def _invoke(
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
fname: str = func.__name__
# fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{our_uid} cancelled by '
'actor was cancelled by '
)
# NOTE / TODO: if we end up having
@ -320,16 +356,37 @@ async def _invoke(
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} '
msg += 'its caller'
else:
msg += f'remote actor {canceller}'
msg += 'a remote peer'
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
msg += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_ task: `{task.name}()`'
)
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
' with msg:\n'
'------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}'
)
@ -449,9 +506,9 @@ async def _invoke(
# always ship errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
# tb=tb, # TODO: special tb fmting?
cid=cid,
)
err_msg['cid'] = cid
if is_rpc:
try:
@ -518,19 +575,28 @@ async def try_ship_error_to_parent(
err: Exception | BaseExceptionGroup,
) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
with CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
await channel.send(
# NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
pack_error(err)
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
# in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
# happen and provides for a 2-general's dilemma..
log.critical(
f"Failed to ship error to parent "
f"{channel.uid}, channel was closed"
f'Failed to ship error to parent '
f'{channel.uid}, IPC transport failure!'
)
@ -588,6 +654,11 @@ class Actor:
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
# _ans: dict[
# tuple[str, str],
# list[ActorNursery],
# ] = {}
# Process-global stack closed at end on actor runtime teardown.
# NOTE: this is currently an undocumented public api.
lifetime_stack: ExitStack = ExitStack()
@ -612,7 +683,10 @@ class Actor:
'''
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))
self.uid = (
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None
@ -827,7 +901,10 @@ class Actor:
return
# channel tracking
event = self._peer_connected.pop(uid, None)
event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
@ -836,46 +913,43 @@ class Actor:
# Alert any task waiting on this connection to come up
event.set()
chans = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
chans: list[Channel] = self._peers[uid]
if chans:
# TODO: re-use channels for new connections instead
# of always new ones?
# => will require changing all the discovery funcs..
log.runtime(
f"already have channel(s) for {uid}:{chans}?"
)
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# TODO: can we just use list-ref directly?
# chans.append(chan)
self._peers[uid].append(chan)
local_nursery: ActorNursery | None = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
try:
disconnected = await process_messages(self, chan)
except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}")
disconnected: bool = await process_messages(self, chan)
except trio.Cancelled:
log.cancel(f'Msg loop was cancelled for {chan}')
raise
finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
):
if local_nursery:
if chan._cancel_called:
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
log.cancel(f'Waiting on cancel request to peer {chan.uid}')
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
@ -920,6 +994,7 @@ class Actor:
# other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
@ -927,22 +1002,42 @@ class Actor:
and poll() is None
):
log.cancel(
f'Actor {uid} IPC broke but proc is alive?\n'
'Attempting to self cancel..'
f'Peer actor IPC broke but proc is alive?\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
# ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}")
log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
)
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.runtime(f"No more channels for {chan.uid}")
log.runtime(
f'No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'- uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
)
# No more channels to other actors (at all) registered
# as connected.
@ -958,15 +1053,58 @@ class Actor:
if _state.is_root_process():
pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid)
log.runtime(f"{uid} blocked from pdb locking")
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug '
'lock!\n'
f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if (
db_cs
and not db_cs.cancel_called
and uid == pdb_user_uid
):
log.critical(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
@ -998,15 +1136,16 @@ class Actor:
chan: Channel,
cid: str,
msg: dict[str, Any],
) -> None:
) -> None|bool:
'''
Push an RPC result to the local consumer's queue.
'''
uid = chan.uid
uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}"
try:
ctx = self._contexts[(uid, cid)]
ctx: Context = self._contexts[(uid, cid)]
except KeyError:
log.warning(
f'Ignoring msg from [no-longer/un]known context {uid}:'
@ -1137,6 +1276,16 @@ class Actor:
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1374,9 +1523,15 @@ class Actor:
'''
tasks: dict = self._rpc_tasks
if tasks:
tasks_str: str = ''
for (ctx, func, _) in tasks.values():
tasks_str += (
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
)
log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}'
f'{tasks_str}'
)
for (
(chan, cid),
@ -1660,7 +1815,10 @@ async def async_main(
)
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# always!
match err:
@ -1750,43 +1908,53 @@ async def process_messages(
or boxed errors back to the remote caller (task).
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg: dict | None = None
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}'
)
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
msg: dict | None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
# dedicated loop terminate sentinel
if msg is None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in actor._rpc_tasks.copy():
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
)
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
f'{pformat(msg)}\n'
)
cid = msg.get('cid')
if cid:
@ -1795,7 +1963,10 @@ async def process_messages(
await actor._push_result(chan, cid, msg)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
f'Waiting on next IPC msg from {chan.uid}:\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue
# TODO: implement with ``match:`` syntax?
@ -1848,7 +2019,7 @@ async def process_messages(
)
log.cancel(
f'Cancelling msg loop for {chan.uid}'
f'Cancelling IPC msg-loop with {chan.uid}'
)
loop_cs.cancel()
break
@ -1890,8 +2061,10 @@ async def process_messages(
try:
func = actor._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
@ -1993,7 +2166,10 @@ async def process_messages(
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
@ -2002,8 +2178,9 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}"
f'Exiting IPC msg loop with {chan.uid} '
f'final msg: {msg}\n'
f'|_{chan}'
)
# transport **was not** disconnected