Improved log msg formatting in core

As part of solving some final edge cases todo with inter-peer remote
cancellation (particularly a remote cancel from a separate actor
tree-client hanging on the request side in `modden`..) I needed less
dense, more line-delimited log msg formats when understanding ipc
channel and context cancels from console logging; this adds a ton of
that to:
- `._invoke()` which now does,
  - better formatting of `Context`-task info as multi-line
    `'<field>: <value>\n'` messages,
  - use of `trio.Task` (from `.lowlevel.current_task()` for full
    rpc-func namespace-path info,
  - better "msg flow annotations" with `<=` for understanding
    `ContextCancelled` flow.
- `Actor._stream_handler()` where in we break down IPC peers reporting
  better as multi-line `|_<Channel>` log msgs instead of all jammed on
  one line..
- `._ipc.Channel.send()` use `pformat()` for repr of packet.

Also tweak some optional deps imports for debug mode:
- add `maybe_import_gb()` for attempting to import `greenback`.
- maybe enable `stackscope` tree pprinter on `SIGUSR1` if installed.

Add a further stale-debugger-lock guard before removal:
- read the `._debug.Lock.global_actor_in_debug: tuple` uid and possibly
  `maybe_wait_for_debugger()` when the child-user is known to have
  a live process in our tree.
- only cancel `Lock._root_local_task_cs_in_debug: CancelScope` when
  the disconnected channel maps to the `Lock.global_actor_in_debug`,
  though not sure this is correct yet?

Started adding missing type annots in sections that were modified.
ctx_cancel_semantics_and_overruns_REVERSED_FACEPALM
Tyler Goodlet 2024-02-19 12:25:08 -05:00
parent c85757aee1
commit 6a303358df
2 changed files with 266 additions and 81 deletions

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
""" """
from __future__ import annotations from __future__ import annotations
import platform
import struct import struct
import typing import platform
from pprint import pformat
from collections.abc import ( from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
) )
import typing
from typing import ( from typing import (
Any, Any,
runtime_checkable, runtime_checkable,
@ -370,7 +371,10 @@ class Channel:
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.transport(f"send `{item}`") # type: ignore log.transport(
'=> send IPC msg:\n\n'
f'{pformat(item)}\n'
) # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)

View File

@ -28,6 +28,7 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
from pprint import pformat
import signal import signal
import sys import sys
from typing import ( from typing import (
@ -48,6 +49,10 @@ import trio
from trio import ( from trio import (
CancelScope, CancelScope,
) )
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import ( from trio_typing import (
Nursery, Nursery,
TaskStatus, TaskStatus,
@ -80,6 +85,26 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
_gb_mod = False
async def _invoke( async def _invoke(
@ -227,15 +252,27 @@ async def _invoke(
# wrapper that calls `Context.started()` and then does # wrapper that calls `Context.started()` and then does
# the `await coro()`? # the `await coro()`?
elif context: elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) # a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
await chan.send({
'functype': 'context',
'cid': cid
})
try: try:
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope ctx._scope = nurse.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
# res: Any = await coro
res = await coro res = await coro
# deliver final result to caller side.
await chan.send({ await chan.send({
'return': res, 'return': res,
'cid': cid 'cid': cid
@ -271,9 +308,10 @@ async def _invoke(
# associated child isn't in debug any more # associated child isn't in debug any more
await _debug.maybe_wait_for_debugger() await _debug.maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid)) ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime( log.cancel(
f'Context entrypoint {func} was terminated:\n' f'Context task was terminated:\n'
f'{ctx}' f'func: {func}\n'
f'ctx: {pformat(ctx)}'
) )
if ctx.cancelled_caught: if ctx.cancelled_caught:
@ -285,13 +323,14 @@ async def _invoke(
if re := ctx._remote_error: if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
fname: str = func.__name__ # fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = (
f'`{fname}()`@{our_uid} cancelled by ' 'actor was cancelled by '
) )
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
@ -310,16 +349,37 @@ async def _invoke(
# some actor who calls `Portal.cancel_actor()` # some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx. # and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} ' msg += 'its caller'
else: else:
msg += f'remote actor {canceller}' msg += 'a remote peer'
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
msg += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_ task: `{task.name}()`'
)
# TODO: does this ever get set any more or can # TODO: does this ever get set any more or can
# we remove it? # we remove it?
if ctx._cancel_msg: if ctx._cancel_msg:
msg += ( msg += (
' with msg:\n' '------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}' f'{ctx._cancel_msg}'
) )
@ -439,9 +499,9 @@ async def _invoke(
# always ship errors back to caller # always ship errors back to caller
err_msg: dict[str, dict] = pack_error( err_msg: dict[str, dict] = pack_error(
err, err,
tb=tb, # tb=tb, # TODO: special tb fmting?
cid=cid,
) )
err_msg['cid'] = cid
if is_rpc: if is_rpc:
try: try:
@ -508,19 +568,28 @@ async def try_ship_error_to_parent(
err: Union[Exception, BaseExceptionGroup], err: Union[Exception, BaseExceptionGroup],
) -> None: ) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
with CancelScope(shield=True): with CancelScope(shield=True):
try: try:
# internal error so ship to parent without cid await channel.send(
await channel.send(pack_error(err)) # NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
pack_error(err)
)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
# in SC terms this is one of the worst things that can # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma. # happen and provides for a 2-general's dilemma..
log.critical( log.critical(
f"Failed to ship error to parent " f'Failed to ship error to parent '
f"{channel.uid}, channel was closed" f'{channel.uid}, IPC transport failure!'
) )
@ -573,6 +642,11 @@ class Actor:
# if started on ``asycio`` running ``trio`` in guest mode # if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False _infected_aio: bool = False
# _ans: dict[
# tuple[str, str],
# list[ActorNursery],
# ] = {}
# Process-global stack closed at end on actor runtime teardown. # Process-global stack closed at end on actor runtime teardown.
# NOTE: this is currently an undocumented public api. # NOTE: this is currently an undocumented public api.
lifetime_stack: ExitStack = ExitStack() lifetime_stack: ExitStack = ExitStack()
@ -593,7 +667,10 @@ class Actor:
''' '''
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None self._cancel_called_by_remote: tuple[str, tuple] | None = None
@ -762,7 +839,10 @@ class Actor:
return return
# channel tracking # channel tracking
event = self._peer_connected.pop(uid, None) event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event: if event:
# Instructing connection: this is likely a new channel to # Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via # a recently spawned actor which we'd like to control via
@ -771,46 +851,43 @@ class Actor:
# Alert any task waiting on this connection to come up # Alert any task waiting on this connection to come up
event.set() event.set()
chans = self._peers[uid] chans: list[Channel] = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
if chans: if chans:
# TODO: re-use channels for new connections instead
# of always new ones?
# => will require changing all the discovery funcs..
log.runtime( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# TODO: can we just use list-ref directly?
# chans.append(chan)
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: ActorNursery | None = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
disconnected: bool = False
try: try:
disconnected = await process_messages(self, chan) disconnected: bool = await process_messages(self, chan)
except trio.Cancelled:
except ( log.cancel(f'Msg loop was cancelled for {chan}')
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery) local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
if ( if local_nursery:
local_nursery
):
if chan._cancel_called: if chan._cancel_called:
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f'Waiting on cancel request to peer {chan.uid}')
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the # underlying transport protocol) to close from the
# remote peer side since we presume that any channel # remote peer side since we presume that any channel
@ -853,26 +930,48 @@ class Actor:
# the cause of other downstream errors. # the cause of other downstream errors.
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
poll = getattr(proc, 'poll', None) poll = getattr(proc, 'poll', None)
if poll and poll() is None: if poll and poll() is None:
log.cancel( log.cancel(
f'Actor {uid} IPC broke but proc is alive?' f'Peer actor IPC broke but proc is alive?\n'
f'uid: {uid}\n'
f'|_{proc}\n'
) )
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}") log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
)
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(
f'No more channels with {chan.uid}'
)
self._peers.pop(uid, None) self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}") peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'- uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
)
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
@ -888,15 +987,58 @@ class Actor:
if _state.is_root_process(): if _state.is_root_process():
pdb_lock = _debug.Lock pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid) pdb_lock._blocked.add(uid)
log.runtime(f"{uid} blocked from pdb locking")
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug '
'lock!\n'
f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs db_cs
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid
): ):
log.warning( log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
@ -928,15 +1070,16 @@ class Actor:
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: dict[str, Any], msg: dict[str, Any],
) -> None:
) -> None|bool:
''' '''
Push an RPC result to the local consumer's queue. Push an RPC result to the local consumer's queue.
''' '''
uid = chan.uid uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx = self._contexts[(uid, cid)] ctx: Context = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
@ -1066,6 +1209,16 @@ class Actor:
parent_data.pop('bind_port'), parent_data.pop('bind_port'),
) )
rvs = parent_data.pop('_runtime_vars') rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f"Runtime vars are: {rvs}") log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1284,9 +1437,15 @@ class Actor:
''' '''
tasks: dict = self._rpc_tasks tasks: dict = self._rpc_tasks
if tasks: if tasks:
tasks_str: str = ''
for (ctx, func, _) in tasks.values():
tasks_str += (
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
)
log.cancel( log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n' f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}' f'{tasks_str}'
) )
for ( for (
(chan, cid), (chan, cid),
@ -1511,7 +1670,10 @@ async def async_main(
) )
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err) await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# always! # always!
match err: match err:
@ -1595,43 +1757,53 @@ async def process_messages(
or boxed errors back to the remote caller (task). or boxed errors back to the remote caller (task).
''' '''
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once `trio` get's an "obvious way" for req/resp we
# worked out we'll likely want to use that! # should use it?
msg: dict | None = None # https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}'
)
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: dict | None = None
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs: with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel # dedicated loop terminate sentinel
if msg is None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel( log.cancel(
f"Channel to {chan.uid} terminated?\n" f'Peer IPC channel terminated via `None` setinel msg?\n'
"Cancelling all associated tasks..") f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
for (channel, cid) in actor._rpc_tasks.copy(): f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan: if channel is chan:
await actor._cancel_task( await actor._cancel_task(
cid, cid,
channel, channel,
) )
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break break
log.transport( # type: ignore log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}") f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
f'{pformat(msg)}\n'
)
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
@ -1640,7 +1812,10 @@ async def process_messages(
await actor._push_result(chan, cid, msg) await actor._push_result(chan, cid, msg)
log.runtime( log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}") f'Waiting on next IPC msg from {chan.uid}:\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue continue
# TODO: implement with ``match:`` syntax? # TODO: implement with ``match:`` syntax?
@ -1693,7 +1868,7 @@ async def process_messages(
) )
log.cancel( log.cancel(
f'Cancelling msg loop for {chan.uid}' f'Cancelling IPC msg-loop with {chan.uid}'
) )
loop_cs.cancel() loop_cs.cancel()
break break
@ -1735,8 +1910,10 @@ async def process_messages(
try: try:
func = actor._get_rpc_func(ns, funcname) func = actor._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err: except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err) err_msg: dict[str, dict] = pack_error(
err_msg['cid'] = cid err,
cid=cid,
)
await chan.send(err_msg) await chan.send(err_msg)
continue continue
@ -1838,7 +2015,10 @@ async def process_messages(
log.exception("Actor errored:") log.exception("Actor errored:")
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err) await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
@ -1847,8 +2027,9 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} " f'Exiting IPC msg loop with {chan.uid} '
f"with last msg:\n{msg}" f'final msg: {msg}\n'
f'|_{chan}'
) )
# transport **was not** disconnected # transport **was not** disconnected