diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 9da8690..75e5951 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -810,61 +810,15 @@ async def process_messages( log.transport( # type: ignore f'<= IPC msg from peer: {chan.uid}\n\n' - # TODO: conditionally avoid fmting depending - # on log level (for perf)? - # => specifically `pformat()` sub-call..? + # TODO: avoid fmting depending on loglevel for perf? + # -[ ] specifically `pformat()` sub-call..? + # -[ ] use `.msg.pretty_struct` here now instead! f'{pformat(msg)}\n' ) match msg: - - # NOTE: this *was a dedicated - # "graceful-terminate-loop" mechanism using - # a `None`-msg-sentinel which would cancel all RPC - # tasks parented by this loop's IPC channel; that - # is all rpc-scheduled-tasks started over the - # connection were explicitly per-task cancelled - # normally prior to the `Channel`'s underlying - # transport being later closed. - # - # * all `.send(None)`s were # removed as part of - # typed-msging requirements - # - # TODO: if this mechanism is still desired going - # forward it should be implemented as part of the - # normal runtime-cancel-RPC endpoints with either, - # - a special `msg.types.Msg` to trigger the loop endpoint - # (like `None` was used prior) or, - # - it should just be accomplished using A - # `Start(ns='self', func='cancel_rpc_tasks())` - # request instead? - # - # if msg is None: - # case None: - # tasks: dict[ - # tuple[Channel, str], - # tuple[Context, Callable, trio.Event] - # ] = actor._rpc_tasks.copy() - # log.cancel( - # f'Peer IPC channel terminated via `None` setinel msg?\n' - # f'=> Cancelling all {len(tasks)} local RPC tasks..\n' - # f'peer: {chan.uid}\n' - # f'|_{chan}\n' - # ) - # # TODO: why aren't we just calling - # # `.cancel_rpc_tasks()` with the parent - # # chan as input instead? - # for (channel, cid) in tasks: - # if channel is chan: - # await actor._cancel_task( - # cid, - # channel, - # requesting_uid=channel.uid, - # ipc_msg=msg, - # ) - # # immediately break out of this loop! - # break - + # msg for an ongoing IPC ctx session, deliver msg to + # local task. case ( StartAck(cid=cid) | Started(cid=cid) @@ -872,7 +826,7 @@ async def process_messages( | Stop(cid=cid) | Return(cid=cid) | CancelAck(cid=cid) - | Error(cid=cid) + | Error(cid=cid) # RPC-task ctx specific ): # deliver response to local caller/waiter # via its per-remote-context memory channel. @@ -881,10 +835,8 @@ async def process_messages( cid, msg, ) - # TODO: can remove right? - # continue - # runtime-internal cancellation endpoints + # `Actor`(-internal) runtime cancel requests case Start( ns='self', func='cancel', @@ -959,11 +911,9 @@ async def process_messages( ) # the "MAIN" RPC endpoint to schedule-a-`trio.Task` - # - # TODO: impl with native `msgspec.Struct` support !! - # -[ ] implement with ``match:`` syntax? - # -[ ] discard un-authed msgs as per, - # + # ------ - ------ + # -[x] discard un-authed msgs as per, + # case Start( cid=cid, ns=ns, @@ -987,7 +937,10 @@ async def process_messages( # application RPC endpoint else: try: - func: Callable = actor._get_rpc_func(ns, funcname) + func: Callable = actor._get_rpc_func( + ns, + funcname, + ) except ( ModuleNotExposed, AttributeError, @@ -1065,6 +1018,8 @@ async def process_messages( trio.Event(), ) + # XXX remote (runtime scoped) error or uknown + # msg (type). case Error() | _: # NOTE: this is the non-rpc error case, # that is, an error **not** raised inside @@ -1090,8 +1045,9 @@ async def process_messages( f'|_{chan}\n' ) - # end of async for, channel disconnect vis - # ``trio.EndOfChannel`` + # END-OF `async for`: + # IPC disconnected via `trio.EndOfChannel`, likely + # due to a (graceful) `Channel.aclose()`. log.runtime( f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' f'|_{chan}\n' @@ -1111,9 +1067,10 @@ async def process_messages( # connection-reset) is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out of # the message loop and expect the teardown sequence to clean - # up. - # TODO: don't show this msg if it's an emphemeral - # discovery ep call? + # up.. + # TODO: add a teardown handshake? and, + # -[ ] don't show this msg if it's an ephemeral discovery ep call? + # -[ ] figure out how this will break with other transports? log.runtime( f'channel closed abruptly with\n' f'peer: {chan.uid}\n' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e08d074..854db3a 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -393,8 +393,9 @@ class Actor: raise mne + # TODO: maybe change to mod-func and rename for implied + # multi-transport semantics? async def _stream_handler( - self, stream: trio.SocketStream, @@ -713,51 +714,6 @@ class Actor: # TODO: figure out why this breaks tests.. db_cs.cancel() - # XXX TODO XXX: DO WE NEED THIS? - # -[ ] is it necessary any more (GC should do it) now - # that we have strict(er) graceful cancellation - # semantics? - # XXX WARNING XXX - # Be AWARE OF THE INDENT LEVEL HERE - # -> ONLY ENTER THIS BLOCK WHEN ._peers IS - # EMPTY!!!! - # - # if the channel is still connected it may mean the far - # end has not closed and we may have gotten here due to - # an error and so we should at least try to terminate - # the channel from this end gracefully. - #if ( - # not self._peers - # and chan.connected() - #): - # log.runtime( - # 'Terminating channel with `None` setinel msg\n' - # f'|_{chan}\n' - # ) - # try: - # # ORIGINALLY we sent a msg loop terminate - # # sentinel (`None`) which triggers - # # cancellation of all remotely started - # # tasks. - # # - # # HOWEVER, after we added typed msging, - # # you can't just willy nilly send `None` - # # wherever since it might be invalid given - # # the currently configured msg-spec. - # # - # # SO, this was all removed and I'm pretty - # # confident we don't need it replaced with - # # a manual RPC to - # # a `Actor.cancel_rpc_tasks()` right? - # await chan.send(None) - - # # XXX: do we want this? NO RIGHT? - # # causes "[104] connection reset by peer" on other end - # # await chan.aclose() - - # except trio.BrokenResourceError: - # log.runtime(f"Channel {chan.uid} was already closed") - # TODO: rename to `._deliver_payload()` since this handles # more then just `result` msgs now obvi XD async def _push_result(