From 582144830f1cdcd4bbec4626cfff50e6f95033f1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 May 2024 09:36:26 -0400 Subject: [PATCH] Parameterize the `return_msg_type` in `._invoke()` Since we also handle a runtime-specific `CancelAck`, allow the caller-scheduler to pass in the expected return-type msg per the RPC msg endpoint loop. --- tractor/_rpc.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index df79c65..9b92d4e 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -64,6 +64,7 @@ from .log import get_logger from .msg import ( current_codec, MsgCodec, + PayloadT, NamespacePath, pretty_struct, ) @@ -98,7 +99,7 @@ async def _invoke_non_context( treat_as_gen: bool, is_rpc: bool, - return_msg: Return|CancelAck = Return, + return_msg_type: Return|CancelAck = Return, task_status: TaskStatus[ Context | BaseException @@ -220,7 +221,7 @@ async def _invoke_non_context( and chan.connected() ): try: - ret_msg = return_msg( + ret_msg = return_msg_type( cid=cid, pld=result, ) @@ -419,7 +420,7 @@ async def _invoke( is_rpc: bool = True, hide_tb: bool = True, - return_msg: Return|CancelAck = Return, + return_msg_type: Return|CancelAck = Return, task_status: TaskStatus[ Context | BaseException @@ -533,7 +534,7 @@ async def _invoke( kwargs, treat_as_gen, is_rpc, - return_msg, + return_msg_type, task_status, ) # XXX below fallthrough is ONLY for `@context` eps @@ -593,18 +594,21 @@ async def _invoke( ctx._scope = tn.cancel_scope task_status.started(ctx) - # TODO: should would be nice to have our - # `TaskMngr` nursery here! - res: Any = await coro - ctx._result = res - - # deliver final result to caller side. - await chan.send( - return_msg( - cid=cid, - pld=res, - ) + # TODO: better `trionics` tooling: + # -[ ] should would be nice to have our `TaskMngr` + # nursery here! + # -[ ] payload value checking like we do with + # `.started()` such that the debbuger can engage + # here in the child task instead of waiting for the + # parent to crash with it's own MTE.. + res: Any|PayloadT = await coro + return_msg: Return|CancelAck = return_msg_type( + cid=cid, + pld=res, ) + # set and shuttle final result to "parent"-side task. + ctx._result = res + await chan.send(return_msg) # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, @@ -940,7 +944,7 @@ async def process_messages( actor.cancel, kwargs, is_rpc=False, - return_msg=CancelAck, + return_msg_type=CancelAck, ) log.runtime( @@ -974,7 +978,7 @@ async def process_messages( actor._cancel_task, kwargs, is_rpc=False, - return_msg=CancelAck, + return_msg_type=CancelAck, ) except BaseException: log.exception(