diff --git a/tractor/_portal.py b/tractor/_portal.py index cc9052b..957eae5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -502,7 +502,7 @@ async def open_portal( ''' actor = current_actor() assert actor - was_connected = False + was_connected: bool = False async with maybe_open_nursery(nursery, shield=shield) as nursery: @@ -533,9 +533,7 @@ async def open_portal( await portal.aclose() if was_connected: - # gracefully signal remote channel-msg loop - await channel.send(None) - # await channel.aclose() + await channel.aclose() # cancel background msg loop task if msg_loop_cs: diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 0549b0c..5559702 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -55,7 +55,6 @@ from ._exceptions import ( TransportClosed, ) from .devx import ( - pause, maybe_wait_for_debugger, _debug, ) @@ -429,8 +428,6 @@ async def _invoke( # XXX for .pause_from_sync()` usage we need to make sure # `greenback` is boostrapped in the subactor! await _debug.maybe_init_greenback() - # else: - # await pause() # TODO: possibly a specially formatted traceback # (not sure what typing is for this..)? @@ -855,30 +852,54 @@ async def process_messages( match msg: + # NOTE: this *was a dedicated + # "graceful-terminate-loop" mechanism using + # a `None`-msg-sentinel which would cancel all RPC + # tasks parented by this loop's IPC channel; that + # is all rpc-scheduled-tasks started over the + # connection were explicitly per-task cancelled + # normally prior to the `Channel`'s underlying + # transport being later closed. + # + # * all `.send(None)`s were # removed as part of + # typed-msging requirements + # + # TODO: if this mechanism is still desired going + # forward it should be implemented as part of the + # normal runtime-cancel-RPC endpoints with either, + # - a special `msg.types.Msg` to trigger the loop endpoint + # (like `None` was used prior) or, + # - it should just be accomplished using A + # `Start(ns='self', func='cancel_rpc_tasks())` + # request instead? + # # if msg is None: - # dedicated loop terminate sentinel - case None: + # case None: + # tasks: dict[ + # tuple[Channel, str], + # tuple[Context, Callable, trio.Event] + # ] = actor._rpc_tasks.copy() + # log.cancel( + # f'Peer IPC channel terminated via `None` setinel msg?\n' + # f'=> Cancelling all {len(tasks)} local RPC tasks..\n' + # f'peer: {chan.uid}\n' + # f'|_{chan}\n' + # ) + # # TODO: why aren't we just calling + # # `.cancel_rpc_tasks()` with the parent + # # chan as input instead? + # for (channel, cid) in tasks: + # if channel is chan: + # await actor._cancel_task( + # cid, + # channel, + # requesting_uid=channel.uid, - tasks: dict[ - tuple[Channel, str], - tuple[Context, Callable, trio.Event] - ] = actor._rpc_tasks.copy() - log.cancel( - f'Peer IPC channel terminated via `None` setinel msg?\n' - f'=> Cancelling all {len(tasks)} local RPC tasks..\n' - f'peer: {chan.uid}\n' - f'|_{chan}\n' - ) - for (channel, cid) in tasks: - if channel is chan: - await actor._cancel_task( - cid, - channel, - requesting_uid=channel.uid, + # ipc_msg=msg, + # ) - ipc_msg=msg, - ) - break + # # immediately break out of this loop! + # break # cid = msg.get('cid') # if cid: @@ -916,7 +937,7 @@ async def process_messages( cid=cid, ns=ns, func=funcname, - kwargs=kwargs, + kwargs=kwargs, # type-spec this? see `msg.types` uid=actorid, ): # try: diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 3bafada..e08d074 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -65,7 +65,11 @@ from trio import ( TaskStatus, ) -from .msg import NamespacePath +from tractor.msg import ( + pretty_struct, + NamespacePath, + types as msgtypes, +) from ._ipc import Channel from ._context import ( mk_context, @@ -91,10 +95,6 @@ from ._rpc import ( process_messages, try_ship_error_to_remote, ) -from tractor.msg import ( - types as msgtypes, - pretty_struct, -) # from tractor.msg.types import ( # Aid, # SpawnSpec, @@ -164,18 +164,15 @@ class Actor: # Information about `__main__` from parent _parent_main_data: dict[str, str] _parent_chan_cs: CancelScope|None = None - _spawn_spec: SpawnSpec|None = None + _spawn_spec: msgtypes.SpawnSpec|None = None # syncs for setup/teardown sequences _server_down: trio.Event|None = None - # user toggled crash handling (including monkey-patched in - # `trio.open_nursery()` via `.trionics._supervisor` B) - _debug_mode: bool = False - # if started on ``asycio`` running ``trio`` in guest mode _infected_aio: bool = False + # TODO: nursery tracking like `trio` does? # _ans: dict[ # tuple[str, str], # list[ActorNursery], @@ -716,35 +713,50 @@ class Actor: # TODO: figure out why this breaks tests.. db_cs.cancel() - # XXX: is this necessary (GC should do it)? + # XXX TODO XXX: DO WE NEED THIS? + # -[ ] is it necessary any more (GC should do it) now + # that we have strict(er) graceful cancellation + # semantics? # XXX WARNING XXX # Be AWARE OF THE INDENT LEVEL HERE # -> ONLY ENTER THIS BLOCK WHEN ._peers IS # EMPTY!!!! - if ( - not self._peers - and chan.connected() - ): - # if the channel is still connected it may mean the far - # end has not closed and we may have gotten here due to - # an error and so we should at least try to terminate - # the channel from this end gracefully. - log.runtime( - 'Terminating channel with `None` setinel msg\n' - f'|_{chan}\n' - ) - try: - # send msg loop terminate sentinel which - # triggers cancellation of all remotely - # started tasks. - await chan.send(None) + # + # if the channel is still connected it may mean the far + # end has not closed and we may have gotten here due to + # an error and so we should at least try to terminate + # the channel from this end gracefully. + #if ( + # not self._peers + # and chan.connected() + #): + # log.runtime( + # 'Terminating channel with `None` setinel msg\n' + # f'|_{chan}\n' + # ) + # try: + # # ORIGINALLY we sent a msg loop terminate + # # sentinel (`None`) which triggers + # # cancellation of all remotely started + # # tasks. + # # + # # HOWEVER, after we added typed msging, + # # you can't just willy nilly send `None` + # # wherever since it might be invalid given + # # the currently configured msg-spec. + # # + # # SO, this was all removed and I'm pretty + # # confident we don't need it replaced with + # # a manual RPC to + # # a `Actor.cancel_rpc_tasks()` right? + # await chan.send(None) - # XXX: do we want this? no right? - # causes "[104] connection reset by peer" on other end - # await chan.aclose() + # # XXX: do we want this? NO RIGHT? + # # causes "[104] connection reset by peer" on other end + # # await chan.aclose() - except trio.BrokenResourceError: - log.runtime(f"Channel {chan.uid} was already closed") + # except trio.BrokenResourceError: + # log.runtime(f"Channel {chan.uid} was already closed") # TODO: rename to `._deliver_payload()` since this handles # more then just `result` msgs now obvi XD @@ -774,9 +786,10 @@ class Actor: log.warning( 'Ignoring invalid IPC ctx msg!\n\n' f'<= sender: {uid}\n' - f'=> cid: {cid}\n\n' + # XXX don't need right since it's always in msg? + # f'=> cid: {cid}\n\n' - f'{msg}\n' + f'{pretty_struct.Struct.pformat(msg)}\n' ) return @@ -1437,7 +1450,7 @@ class Actor: ) await self._ongoing_rpc_tasks.wait() - def cancel_server(self) -> None: + def cancel_server(self) -> bool: ''' Cancel the internal IPC transport server nursery thereby preventing any new inbound IPC connections establishing. @@ -1446,6 +1459,9 @@ class Actor: if self._server_n: log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() + return True + + return False @property def accept_addrs(self) -> list[tuple[str, int]]: