diff --git a/tractor/_portal.py b/tractor/_portal.py index c2e7315..80fc902 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,7 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - RemoteActorError, + # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -54,8 +54,23 @@ def func_deats(func: Callable) -> Tuple[str, str]: ) +def _unwrap_msg( + + msg: dict[str, Any], + channel: Channel + +) -> Any: + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, channel) + + class Portal: - """A 'portal' to a(n) (remote) ``Actor``. + ''' + A 'portal' to a(n) (remote) ``Actor``. A portal is "opened" (and eventually closed) by one side of an inter-actor communication context. The side which opens the portal @@ -71,7 +86,7 @@ class Portal: function calling semantics are supported transparently; hence it is like having a "portal" between the seperate actor memory spaces. - """ + ''' def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then @@ -130,16 +145,11 @@ class Portal: resptype: str, first_msg: dict - ) -> tuple[Any, dict[str, Any]]: - assert resptype == 'asyncfunc' # single response + ) -> dict[str, Any]: + assert resptype == 'asyncfunc' # single response msg = await recv_chan.receive() - try: - return msg['return'], msg - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) + return msg async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -162,19 +172,9 @@ class Portal: assert self._expect_result if self._result_msg is None: - try: - result, self._result_msg = await self._return_once( - *self._expect_result) - except RemoteActorError as err: - result = err - else: - result = self._result_msg['return'] + self._result_msg = await self._return_once(*self._expect_result) - # re-raise error on every call - if isinstance(result, RemoteActorError): - raise result - - return result + return _unwrap_msg(self._result_msg, self.channel) async def _cancel_streams(self): # terminate all locally running async generator @@ -249,10 +249,10 @@ class Portal: instance methods in the remote runtime. Currently this should only be used for `tractor` internals. """ - value, _ = await self._return_once( + msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) - return value + return _unwrap_msg(msg, self.channel) async def run( self, @@ -293,9 +293,12 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - return (await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)) - ))[0] + return _unwrap_msg( + await self._return_once( + *(await self._submit(fn_mod_path, fn_name, kwargs)), + ), + self.channel, + ) @asynccontextmanager async def open_stream_from(