diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 39f6222..5e286c1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -19,13 +19,14 @@ Inter-process comms abstractions """ from __future__ import annotations -import platform import struct -import typing +import platform +from pprint import pformat from collections.abc import ( AsyncGenerator, AsyncIterator, ) +import typing from typing import ( Any, runtime_checkable, @@ -370,7 +371,10 @@ class Channel: async def send(self, item: Any) -> None: - log.transport(f"send `{item}`") # type: ignore + log.transport( + '=> send IPC msg:\n\n' + f'{pformat(item)}\n' + ) # type: ignore assert self.msgstream await self.msgstream.send(item) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 5808065..f25d3e5 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -48,6 +48,10 @@ import trio from trio import ( CancelScope, ) +from trio.lowlevel import ( + current_task, + Task, +) from trio_typing import ( Nursery, TaskStatus, @@ -67,7 +71,11 @@ from ._exceptions import ( ContextCancelled, TransportClosed, ) -from .devx import _debug +from .devx import ( + # pause, + maybe_wait_for_debugger, + _debug, +) from ._discovery import get_registry from ._portal import Portal from . import _state @@ -80,6 +88,26 @@ if TYPE_CHECKING: log = get_logger('tractor') +_gb_mod: ModuleType|None|False = None + + +async def maybe_import_gb(): + global _gb_mod + if _gb_mod is False: + return + + try: + import greenback + _gb_mod = greenback + await greenback.ensure_portal() + + except ModuleNotFoundError: + log.warning( + '`greenback` is not installed.\n' + 'No sync debug support!' + ) + _gb_mod = False + async def _invoke( @@ -106,17 +134,11 @@ async def _invoke( failed_resp: bool = False if _state.debug_mode(): - try: - import greenback - await greenback.ensure_portal() - except ModuleNotFoundError: - log.warning( - '`greenback` is not installed.\n' - 'No sync debug support!' - ) + await maybe_import_gb() - # possibly a traceback (not sure what typing is for this..) - tb = None + # TODO: possibly a specially formatted traceback + # (not sure what typing is for this..)? + # tb = None cancel_scope = CancelScope() # activated cancel scope ref @@ -237,15 +259,27 @@ async def _invoke( # wrapper that calls `Context.started()` and then does # the `await coro()`? elif context: - # context func with support for bi-dir streaming - await chan.send({'functype': 'context', 'cid': cid}) + + # a "context" endpoint type is the most general and + # "least sugary" type of RPC ep with support for + # bi-dir streaming B) + await chan.send({ + 'functype': 'context', + 'cid': cid + }) try: async with trio.open_nursery() as nurse: ctx._scope_nursery = nurse ctx._scope = nurse.cancel_scope task_status.started(ctx) + + # TODO: should would be nice to have our + # `TaskMngr` nursery here! + # res: Any = await coro res = await coro + + # deliver final result to caller side. await chan.send({ 'return': res, 'cid': cid @@ -279,11 +313,12 @@ async def _invoke( # don't pop the local context until we know the # associated child isn't in debug any more - await _debug.maybe_wait_for_debugger() + await maybe_wait_for_debugger() ctx: Context = actor._contexts.pop((chan.uid, cid)) - log.runtime( - f'Context entrypoint {func} was terminated:\n' - f'{ctx}' + log.cancel( + f'Context task was terminated:\n' + f'func: {func}\n' + f'ctx: {pformat(ctx)}' ) if ctx.cancelled_caught: @@ -295,13 +330,14 @@ async def _invoke( if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - fname: str = func.__name__ + # fname: str = func.__name__ + task: Task = current_task() cs: CancelScope = ctx._scope if cs.cancel_called: our_uid: tuple = actor.uid canceller: tuple = ctx.canceller msg: str = ( - f'`{fname}()`@{our_uid} cancelled by ' + 'actor was cancelled by ' ) # NOTE / TODO: if we end up having @@ -320,16 +356,37 @@ async def _invoke( # some actor who calls `Portal.cancel_actor()` # and by side-effect cancels this ctx. elif canceller == ctx.chan.uid: - msg += f'its caller {canceller} ' + msg += 'its caller' else: - msg += f'remote actor {canceller}' + msg += 'a remote peer' + + div_chars: str = '------ - ------' + div_offset: int = ( + round(len(msg)/2)+1 + + + round(len(div_chars)/2)+1 + ) + div_str: str = ( + '\n' + + + ' '*div_offset + + + f'{div_chars}\n' + ) + msg += ( + div_str + + f'<= canceller: {canceller}\n' + f'=> uid: {our_uid}\n' + f' |_ task: `{task.name}()`' + ) # TODO: does this ever get set any more or can # we remove it? if ctx._cancel_msg: msg += ( - ' with msg:\n' + '------ - ------\n' + 'IPC msg:\n' f'{ctx._cancel_msg}' ) @@ -449,9 +506,9 @@ async def _invoke( # always ship errors back to caller err_msg: dict[str, dict] = pack_error( err, - tb=tb, + # tb=tb, # TODO: special tb fmting? + cid=cid, ) - err_msg['cid'] = cid if is_rpc: try: @@ -518,19 +575,28 @@ async def try_ship_error_to_parent( err: Exception | BaseExceptionGroup, ) -> None: + ''' + Box, pack and encode a local runtime(-internal) exception for + an IPC channel `.send()` with transport/network failures and + local cancellation ignored but logged as critical(ly bad). + + ''' with CancelScope(shield=True): try: - # internal error so ship to parent without cid - await channel.send(pack_error(err)) + await channel.send( + # NOTE: normally only used for internal runtime errors + # so ship to peer actor without a cid. + pack_error(err) + ) except ( trio.ClosedResourceError, trio.BrokenResourceError, ): # in SC terms this is one of the worst things that can - # happen and creates the 2-general's dilemma. + # happen and provides for a 2-general's dilemma.. log.critical( - f"Failed to ship error to parent " - f"{channel.uid}, channel was closed" + f'Failed to ship error to parent ' + f'{channel.uid}, IPC transport failure!' ) @@ -588,6 +654,11 @@ class Actor: # if started on ``asycio`` running ``trio`` in guest mode _infected_aio: bool = False + # _ans: dict[ + # tuple[str, str], + # list[ActorNursery], + # ] = {} + # Process-global stack closed at end on actor runtime teardown. # NOTE: this is currently an undocumented public api. lifetime_stack: ExitStack = ExitStack() @@ -612,7 +683,10 @@ class Actor: ''' self.name = name - self.uid = (name, uid or str(uuid.uuid4())) + self.uid = ( + name, + uid or str(uuid.uuid4()) + ) self._cancel_complete = trio.Event() self._cancel_called_by_remote: tuple[str, tuple] | None = None @@ -827,7 +901,10 @@ class Actor: return # channel tracking - event = self._peer_connected.pop(uid, None) + event: trio.Event|None = self._peer_connected.pop( + uid, + None, + ) if event: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via @@ -836,46 +913,43 @@ class Actor: # Alert any task waiting on this connection to come up event.set() - chans = self._peers[uid] - - # TODO: re-use channels for new connections instead - # of always new ones; will require changing all the - # discovery funcs + chans: list[Channel] = self._peers[uid] if chans: + # TODO: re-use channels for new connections instead + # of always new ones? + # => will require changing all the discovery funcs.. log.runtime( f"already have channel(s) for {uid}:{chans}?" ) - log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel + log.runtime(f"Registered {chan} for {uid}") # type: ignore + # TODO: can we just use list-ref directly? + # chans.append(chan) self._peers[uid].append(chan) - local_nursery: ActorNursery | None = None # noqa - disconnected: bool = False - # Begin channel management - respond to remote requests and # process received reponses. + disconnected: bool = False try: - disconnected = await process_messages(self, chan) - - except ( - trio.Cancelled, - ): - log.cancel(f"Msg loop was cancelled for {chan}") + disconnected: bool = await process_messages(self, chan) + except trio.Cancelled: + log.cancel(f'Msg loop was cancelled for {chan}') raise finally: - local_nursery = self._actoruid2nursery.get(uid, local_nursery) + local_nursery: ( + ActorNursery|None + ) = self._actoruid2nursery.get(uid) # This is set in ``Portal.cancel_actor()``. So if # the peer was cancelled we try to wait for them # to tear down their side of the connection before # moving on with closing our own side. - if ( - local_nursery - ): + if local_nursery: if chan._cancel_called: - log.cancel(f"Waiting on cancel request to peer {chan.uid}") + log.cancel(f'Waiting on cancel request to peer {chan.uid}') + # XXX: this is a soft wait on the channel (and its # underlying transport protocol) to close from the # remote peer side since we presume that any channel @@ -920,6 +994,7 @@ class Actor: # other downstream errors. entry = local_nursery._children.get(uid) if entry: + proc: trio.Process _, proc, _ = entry if ( @@ -927,22 +1002,42 @@ class Actor: and poll() is None ): log.cancel( - f'Actor {uid} IPC broke but proc is alive?\n' - 'Attempting to self cancel..' + f'Peer actor IPC broke but proc is alive?\n' + f'uid: {uid}\n' + f'|_{proc}\n' ) # ``Channel`` teardown and closure sequence # Drop ref to channel so it can be gc-ed and disconnected - log.runtime(f"Releasing channel {chan} from {chan.uid}") + log.runtime( + f'Disconnected IPC channel:\n' + f'uid: {chan.uid}\n' + f'|_{pformat(chan)}\n' + ) chans = self._peers.get(chan.uid) chans.remove(chan) if not chans: - log.runtime(f"No more channels for {chan.uid}") + log.runtime( + f'No more channels with {chan.uid}' + ) self._peers.pop(uid, None) - log.runtime(f"Peers is {self._peers}") + peers_str: str = '' + for uid, chans in self._peers.items(): + peers_str += ( + f'- uid: {uid}\n' + ) + for i, chan in enumerate(chans): + peers_str += ( + f' |_[{i}] {pformat(chan)}\n' + ) + + log.runtime( + f'Remaining IPC {len(self._peers)} peers:\n' + + peers_str + ) # No more channels to other actors (at all) registered # as connected. @@ -958,15 +1053,58 @@ class Actor: if _state.is_root_process(): pdb_lock = _debug.Lock pdb_lock._blocked.add(uid) - log.runtime(f"{uid} blocked from pdb locking") + # TODO: NEEEDS TO BE TESTED! + # actually, no idea if this ever even enters.. XD + pdb_user_uid: tuple = pdb_lock.global_actor_in_debug + if ( + pdb_user_uid + and local_nursery + ): + entry: tuple|None = local_nursery._children.get(pdb_user_uid) + if entry: + proc: trio.Process + _, proc, _ = entry + + if ( + (poll := getattr(proc, 'poll', None)) + and poll() is None + ): + log.cancel( + 'Root actor reports no-more-peers, BUT ' + 'a DISCONNECTED child still has the debug ' + 'lock!\n' + f'root uid: {self.uid}\n' + f'last disconnected child uid: {uid}\n' + f'locking child uid: {pdb_user_uid}\n' + ) + await maybe_wait_for_debugger( + child_in_debug=True + ) + + # TODO: just bc a child's transport dropped + # doesn't mean it's not still using the pdb + # REPL! so, + # -[ ] ideally we can check out child proc + # tree to ensure that its alive (and + # actually using the REPL) before we cancel + # it's lock acquire by doing the below! + # -[ ] create a way to read the tree of each actor's + # grandchildren such that when an + # intermediary parent is cancelled but their + # child has locked the tty, the grandparent + # will not allow the parent to cancel or + # zombie reap the child! see open issue: + # - https://github.com/goodboy/tractor/issues/320 + # ------ - ------ # if a now stale local task has the TTY lock still # we cancel it to allow servicing other requests for # the lock. - db_cs = pdb_lock._root_local_task_cs_in_debug + db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug if ( db_cs and not db_cs.cancel_called + and uid == pdb_user_uid ): log.critical( f'STALE DEBUG LOCK DETECTED FOR {uid}' @@ -998,15 +1136,16 @@ class Actor: chan: Channel, cid: str, msg: dict[str, Any], - ) -> None: + + ) -> None|bool: ''' Push an RPC result to the local consumer's queue. ''' - uid = chan.uid + uid: tuple[str, str] = chan.uid assert uid, f"`chan.uid` can't be {uid}" try: - ctx = self._contexts[(uid, cid)] + ctx: Context = self._contexts[(uid, cid)] except KeyError: log.warning( f'Ignoring msg from [no-longer/un]known context {uid}:' @@ -1137,6 +1276,16 @@ class Actor: ) accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') rvs = parent_data.pop('_runtime_vars') + + if rvs['_debug_mode']: + try: + from .devx import enable_stack_on_sig + enable_stack_on_sig() + except ImportError: + log.warning( + '`stackscope` not installed for use in debug mode!' + ) + log.runtime(f"Runtime vars are: {rvs}") rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -1374,9 +1523,15 @@ class Actor: ''' tasks: dict = self._rpc_tasks if tasks: + tasks_str: str = '' + for (ctx, func, _) in tasks.values(): + tasks_str += ( + f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n' + ) + log.cancel( f'Cancelling all {len(tasks)} rpc tasks:\n' - f'{tasks}' + f'{tasks_str}' ) for ( (chan, cid), @@ -1660,7 +1815,10 @@ async def async_main( ) if actor._parent_chan: - await try_ship_error_to_parent(actor._parent_chan, err) + await try_ship_error_to_parent( + actor._parent_chan, + err, + ) # always! match err: @@ -1750,43 +1908,53 @@ async def process_messages( or boxed errors back to the remote caller (task). ''' - # TODO: once https://github.com/python-trio/trio/issues/467 gets - # worked out we'll likely want to use that! - msg: dict | None = None + # TODO: once `trio` get's an "obvious way" for req/resp we + # should use it? + # https://github.com/python-trio/trio/issues/467 + log.runtime( + 'Entering IPC msg loop:\n' + f'peer: {chan.uid}\n' + f'|_{chan}' + ) nursery_cancelled_before_task: bool = False - - log.runtime(f"Entering msg loop for {chan} from {chan.uid}") + msg: dict | None = None try: + # NOTE: this internal scope allows for keeping this + # message loop running despite the current task having + # been cancelled (eg. `open_portal()` may call this method + # from a locally spawned task) and recieve this scope + # using ``scope = Nursery.start()`` with CancelScope(shield=shield) as loop_cs: - # this internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) and recieve this scope using - # ``scope = Nursery.start()`` task_status.started(loop_cs) async for msg in chan: - if msg is None: # loop terminate sentinel + # dedicated loop terminate sentinel + if msg is None: + tasks: dict[ + tuple[Channel, str], + tuple[Context, Callable, trio.Event] + ] = actor._rpc_tasks.copy() log.cancel( - f"Channel to {chan.uid} terminated?\n" - "Cancelling all associated tasks..") - - for (channel, cid) in actor._rpc_tasks.copy(): + f'Peer IPC channel terminated via `None` setinel msg?\n' + f'=> Cancelling all {len(tasks)} local RPC tasks..\n' + f'peer: {chan.uid}\n' + f'|_{chan}\n' + ) + for (channel, cid) in tasks: if channel is chan: await actor._cancel_task( cid, channel, ) - - log.runtime( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - break log.transport( # type: ignore - f"Received msg {msg} from {chan.uid}") + f'<= IPC msg from peer: {chan.uid}\n\n' + # TODO: conditionally avoid fmting depending + # on log level (for perf)? + f'{pformat(msg)}\n' + ) cid = msg.get('cid') if cid: @@ -1795,7 +1963,10 @@ async def process_messages( await actor._push_result(chan, cid, msg) log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") + f'Waiting on next IPC msg from {chan.uid}:\n' + # f'last msg: {msg}\n' + f'|_{chan}' + ) continue # TODO: implement with ``match:`` syntax? @@ -1848,7 +2019,7 @@ async def process_messages( ) log.cancel( - f'Cancelling msg loop for {chan.uid}' + f'Cancelling IPC msg-loop with {chan.uid}' ) loop_cs.cancel() break @@ -1890,8 +2061,10 @@ async def process_messages( try: func = actor._get_rpc_func(ns, funcname) except (ModuleNotExposed, AttributeError) as err: - err_msg = pack_error(err) - err_msg['cid'] = cid + err_msg: dict[str, dict] = pack_error( + err, + cid=cid, + ) await chan.send(err_msg) continue @@ -1993,7 +2166,10 @@ async def process_messages( log.exception("Actor errored:") if actor._parent_chan: - await try_ship_error_to_parent(actor._parent_chan, err) + await try_ship_error_to_parent( + actor._parent_chan, + err, + ) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" @@ -2002,8 +2178,9 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey log.runtime( - f"Exiting msg loop for {chan} from {chan.uid} " - f"with last msg:\n{msg}" + f'Exiting IPC msg loop with {chan.uid} ' + f'final msg: {msg}\n' + f'|_{chan}' ) # transport **was not** disconnected