diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 5559702..9da8690 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -61,13 +61,15 @@ from .devx import ( from . import _state from .log import get_logger from tractor.msg.types import ( + CancelAck, + Error, + Msg, + Return, Start, StartAck, Started, Stop, Yield, - Return, - Error, ) @@ -89,6 +91,7 @@ async def _invoke_non_context( treat_as_gen: bool, is_rpc: bool, + return_msg: Return|CancelAck = Return, task_status: TaskStatus[ Context | BaseException @@ -97,7 +100,6 @@ async def _invoke_non_context( # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): - # await chan.send({ await chan.send( StartAck( cid=cid, @@ -123,11 +125,6 @@ async def _invoke_non_context( # to_send = await chan.recv_nowait() # if to_send is not None: # to_yield = await coro.asend(to_send) - # await chan.send({ - # # Yield() - # 'cid': cid, - # 'yield': item, - # }) await chan.send( Yield( cid=cid, @@ -142,11 +139,6 @@ async def _invoke_non_context( await chan.send( Stop(cid=cid) ) - # await chan.send({ - # # Stop( - # 'cid': cid, - # 'stop': True, - # }) # one way @stream func that gets treated like an async gen # TODO: can we unify this with the `context=True` impl below? @@ -157,11 +149,6 @@ async def _invoke_non_context( functype='asyncgen', ) ) - # await chan.send({ - # # StartAck() - # 'cid': cid, - # 'functype': 'asyncgen', - # }) # XXX: the async-func may spawn further tasks which push # back values like an async-generator would but must # manualy construct the response dict-packet-responses as @@ -177,11 +164,6 @@ async def _invoke_non_context( await chan.send( Stop(cid=cid) ) - # await chan.send({ - # # Stop( - # 'cid': cid, - # 'stop': True, - # }) else: # regular async function/method # XXX: possibly just a scheduled `Actor._cancel_task()` @@ -199,11 +181,6 @@ async def _invoke_non_context( functype='asyncfunc', ) ) - # await chan.send({ - # # StartAck() - # 'cid': cid, - # 'functype': 'asyncfunc', - # }) except ( trio.ClosedResourceError, trio.BrokenResourceError, @@ -237,13 +214,8 @@ async def _invoke_non_context( and chan.connected() ): try: - # await chan.send({ - # # Return() - # 'cid': cid, - # 'return': result, - # }) await chan.send( - Return( + return_msg( cid=cid, pld=result, ) @@ -408,6 +380,7 @@ async def _invoke( is_rpc: bool = True, hide_tb: bool = True, + return_msg: Return|CancelAck = Return, task_status: TaskStatus[ Context | BaseException @@ -517,6 +490,7 @@ async def _invoke( kwargs, treat_as_gen, is_rpc, + return_msg, task_status, ) # below is only for `@context` funcs @@ -547,11 +521,6 @@ async def _invoke( functype='context', ) ) - # await chan.send({ - # # StartAck() - # 'cid': cid, - # 'functype': 'context', - # }) # TODO: should we also use an `.open_context()` equiv # for this callee side by factoring the impl from @@ -576,16 +545,11 @@ async def _invoke( # deliver final result to caller side. await chan.send( - Return( + return_msg( cid=cid, pld=res, ) ) - # await chan.send({ - # # Return() - # 'cid': cid, - # 'return': res, - # }) # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, @@ -674,7 +638,6 @@ async def _invoke( ctxc = ContextCancelled( msg, boxed_type=trio.Cancelled, - # boxed_type_str='Cancelled', canceller=canceller, ) # assign local error so that the `.outcome` @@ -775,12 +738,12 @@ async def try_ship_error_to_remote( trio.BrokenResourceError, BrokenPipeError, ): - # err_msg: dict = msg['error']['tb_str'] log.critical( 'IPC transport failure -> ' f'failed to ship error to {remote_descr}!\n\n' f'X=> {channel.uid}\n\n' - # f'{err_msg}\n' + + # TODO: use `.msg.preetty_struct` for this! f'{msg}\n' ) @@ -822,6 +785,8 @@ async def process_messages( ''' + assert actor._service_n # state sanity + # TODO: once `trio` get's an "obvious way" for req/resp we # should use it? # https://github.com/python-trio/trio/issues/467 @@ -831,7 +796,7 @@ async def process_messages( f'|_{chan}\n' ) nursery_cancelled_before_task: bool = False - msg: dict | None = None + msg: Msg|None = None try: # NOTE: this internal scope allows for keeping this # message loop running despite the current task having @@ -840,6 +805,7 @@ async def process_messages( # using ``scope = Nursery.start()`` with CancelScope(shield=shield) as loop_cs: task_status.started(loop_cs) + async for msg in chan: log.transport( # type: ignore f'<= IPC msg from peer: {chan.uid}\n\n' @@ -894,21 +860,18 @@ async def process_messages( # cid, # channel, # requesting_uid=channel.uid, - # ipc_msg=msg, # ) - # # immediately break out of this loop! # break - # cid = msg.get('cid') - # if cid: case ( StartAck(cid=cid) | Started(cid=cid) | Yield(cid=cid) | Stop(cid=cid) | Return(cid=cid) + | CancelAck(cid=cid) | Error(cid=cid) ): # deliver response to local caller/waiter @@ -918,17 +881,85 @@ async def process_messages( cid, msg, ) + # TODO: can remove right? + # continue + + # runtime-internal cancellation endpoints + case Start( + ns='self', + func='cancel', + cid=cid, + kwargs=kwargs, + ): + kwargs |= {'req_chan': chan} + + # XXX NOTE XXX don't start entire actor + # runtime cancellation if this actor is + # currently in debug mode! + pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # Either of `Actor.cancel()`/`.cancel_soon()` + # was called, so terminate this IPC msg + # loop, exit back out into `async_main()`, + # and immediately start the core runtime + # machinery shutdown! + with CancelScope(shield=True): + await _invoke( + actor, + cid, + chan, + actor.cancel, + kwargs, + is_rpc=False, + return_msg=CancelAck, + ) log.runtime( - 'Waiting on next IPC msg from\n' - f'peer: {chan.uid}:\n' + 'Cancelling IPC transport msg-loop with peer:\n' f'|_{chan}\n' - - # f'last msg: {msg}\n' ) - continue + loop_cs.cancel() + break - # process a 'cmd' request-msg upack + case Start( + ns='self', + func='_cancel_task', + cid=cid, + kwargs=kwargs, + ): + target_cid: str = kwargs['cid'] + kwargs |= { + 'requesting_uid': chan.uid, + 'ipc_msg': msg, + + # XXX NOTE! ONLY the rpc-task-owning + # parent IPC channel should be able to + # cancel it! + 'parent_chan': chan, + } + try: + await _invoke( + actor, + cid, + chan, + actor._cancel_task, + kwargs, + is_rpc=False, + return_msg=CancelAck, + ) + except BaseException: + log.exception( + 'Failed to cancel task?\n' + f'<= canceller: {chan.uid}\n' + f' |_{chan}\n\n' + f'=> {actor}\n' + f' |_cid: {target_cid}\n' + ) + + # the "MAIN" RPC endpoint to schedule-a-`trio.Task` + # # TODO: impl with native `msgspec.Struct` support !! # -[ ] implement with ``match:`` syntax? # -[ ] discard un-authed msgs as per, @@ -940,139 +971,29 @@ async def process_messages( kwargs=kwargs, # type-spec this? see `msg.types` uid=actorid, ): - # try: - # ( - # ns, - # funcname, - # kwargs, - # actorid, - # cid, - # ) = msg['cmd'] - - # # TODO: put in `case Error():` right? - # except KeyError: - # # This is the non-rpc error case, that is, an - # # error **not** raised inside a call to ``_invoke()`` - # # (i.e. no cid was provided in the msg - see above). - # # Push this error to all local channel consumers - # # (normally portals) by marking the channel as errored - # assert chan.uid - # exc = unpack_error(msg, chan=chan) - # chan._exc = exc - # raise exc - log.runtime( 'Handling RPC `Start` request from\n' f'peer: {actorid}\n' '\n' f'=> {ns}.{funcname}({kwargs})\n' ) - # case Start( - # ns='self', - # funcname='cancel', - # ): + + # runtime-internal endpoint: `Actor.` + # only registry methods exist now yah, + # like ``.register_actor()`` etc. ? if ns == 'self': - if funcname == 'cancel': - func: Callable = actor.cancel - kwargs |= { - 'req_chan': chan, - } + func: Callable = getattr(actor, funcname) - # don't start entire actor runtime cancellation - # if this actor is currently in debug mode! - pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete - if pdb_complete: - await pdb_complete.wait() - - # Either of `Actor.cancel()`/`.cancel_soon()` - # was called, so terminate this IPC msg - # loop, exit back out into `async_main()`, - # and immediately start the core runtime - # machinery shutdown! - with CancelScope(shield=True): - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - - log.runtime( - 'Cancelling IPC transport msg-loop with peer:\n' - f'|_{chan}\n' - ) - loop_cs.cancel() - break - - # case Start( - # ns='self', - # funcname='_cancel_task', - # ): - if funcname == '_cancel_task': - func: Callable = actor._cancel_task - - # we immediately start the runtime machinery - # shutdown - # with CancelScope(shield=True): - target_cid: str = kwargs['cid'] - kwargs |= { - # NOTE: ONLY the rpc-task-owning - # parent IPC channel should be able to - # cancel it! - 'parent_chan': chan, - 'requesting_uid': chan.uid, - 'ipc_msg': msg, - } - # TODO: remove? already have emit in meth. - # log.runtime( - # f'Rx RPC task cancel request\n' - # f'<= canceller: {chan.uid}\n' - # f' |_{chan}\n\n' - # f'=> {actor}\n' - # f' |_cid: {target_cid}\n' - # ) - try: - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception( - 'Failed to cancel task?\n' - f'<= canceller: {chan.uid}\n' - f' |_{chan}\n\n' - f'=> {actor}\n' - f' |_cid: {target_cid}\n' - ) - continue - - # case Start( - # ns='self', - # funcname='register_actor', - # ): - else: - # normally registry methods, eg. - # ``.register_actor()`` etc. - func: Callable = getattr(actor, funcname) - - # case Start( - # ns=str(), - # funcname=funcname, - # ): + # application RPC endpoint else: - # complain to client about restricted modules try: - func = actor._get_rpc_func(ns, funcname) + func: Callable = actor._get_rpc_func(ns, funcname) except ( ModuleNotExposed, AttributeError, ) as err: + # always complain to requester + # client about un-enabled modules err_msg: dict[str, dict] = pack_error( err, cid=cid, @@ -1082,6 +1003,7 @@ async def process_messages( # schedule a task for the requested RPC function # in the actor's main "service nursery". + # # TODO: possibly a service-tn per IPC channel for # supervision isolation? would avoid having to # manage RPC tasks individually in `._rpc_tasks` @@ -1090,7 +1012,7 @@ async def process_messages( f'Spawning task for RPC request\n' f'<= caller: {chan.uid}\n' f' |_{chan}\n\n' - # TODO: maddr style repr? + # ^-TODO-^ maddr style repr? # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' # f'cid="{cid[-16:]} .."\n\n' @@ -1098,7 +1020,6 @@ async def process_messages( f' |_cid: {cid}\n' f' |>> {func}()\n' ) - assert actor._service_n # wait why? do it at top? try: ctx: Context = await actor._service_n.start( partial( @@ -1128,13 +1049,12 @@ async def process_messages( log.warning( 'Task for RPC failed?' f'|_ {func}()\n\n' - f'{err}' ) continue else: - # mark that we have ongoing rpc tasks + # mark our global state with ongoing rpc tasks actor._ongoing_rpc_tasks = trio.Event() # store cancel scope such that the rpc task can be @@ -1145,23 +1065,24 @@ async def process_messages( trio.Event(), ) - case Error()|_: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored + case Error() | _: + # NOTE: this is the non-rpc error case, + # that is, an error **not** raised inside + # a call to ``_invoke()`` (i.e. no cid was + # provided in the msg - see above). Push + # this error to all local channel + # consumers (normally portals) by marking + # the channel as errored log.exception( f'Unhandled IPC msg:\n\n' f'{msg}\n' ) - assert chan.uid - exc = unpack_error( + # assert chan.uid + chan._exc: Exception = unpack_error( msg, chan=chan, ) - chan._exc = exc - raise exc + raise chan._exc log.runtime( 'Waiting on next IPC msg from\n' @@ -1172,7 +1093,8 @@ async def process_messages( # end of async for, channel disconnect vis # ``trio.EndOfChannel`` log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" + f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' + f'|_{chan}\n' ) await actor.cancel_rpc_tasks( req_uid=actor.uid, diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 3e7a2d7..7355a61 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -454,6 +454,10 @@ _runtime_msgs: list[Msg] = [ # emission from `MsgStream.aclose()` Stop, + # `Return` sub-type that we always accept from + # runtime-internal cancel endpoints + CancelAck, + # box remote errors, normally subtypes # of `RemoteActorError`. Error,