From 0e7234aa689bce3ff5d59269418050cef746ab07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Nov 2021 07:27:14 -0500 Subject: [PATCH] Cache the return message instead of the value Thanks to @richardsheridan for pointing out the limitations of using *any* kind of value as the result-cached-flag and how it might cause problems for anyone returning pickled blob-data. This changes the `Portal` internal result value tracking to stash the full message from which the value can be retrieved by any `Portal.result()` caller. The internal change is that `Portal._return_once()` now returns a tuple of the message *and* its value. --- newsfragments/264.bug.rst | 5 +++-- tractor/_portal.py | 36 ++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/newsfragments/264.bug.rst b/newsfragments/264.bug.rst index 4272dab..dda3389 100644 --- a/newsfragments/264.bug.rst +++ b/newsfragments/264.bug.rst @@ -3,5 +3,6 @@ Fix ``Portal.run_in_actor()`` returns ``None`` result. ``None`` was being used as the cached result flag and obviously breaks on a ``None`` returned from the remote target task. This would cause an infinite hang if user code ever called ``Portal.result()`` *before* the -nursery exit. The simple fix is to use the ``Channel.uid`` as the -initial "no-result-received-yet" flag. +nursery exit. The simple fix is to use the *return message* as the +initial "no-result-received-yet" flag value and, once received, the +return value is read from the message to avoid the cache logic error. diff --git a/tractor/_portal.py b/tractor/_portal.py index e4e11b9..c2e7315 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -77,7 +77,8 @@ class Portal: # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result: Optional[Any] = channel.uid + self._result_msg: Optional[dict] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] @@ -129,12 +130,12 @@ class Portal: resptype: str, first_msg: dict - ) -> Any: + ) -> tuple[Any, dict[str, Any]]: assert resptype == 'asyncfunc' # single response msg = await recv_chan.receive() try: - return msg['return'] + return msg['return'], msg except KeyError: # internal error should never get here assert msg.get('cid'), "Received internal error at portal?" @@ -159,17 +160,21 @@ class Portal: # expecting a "main" result assert self._expect_result - if self._result is self.channel.uid: + + if self._result_msg is None: try: - self._result = await self._return_once(*self._expect_result) + result, self._result_msg = await self._return_once( + *self._expect_result) except RemoteActorError as err: - self._result = err + result = err + else: + result = self._result_msg['return'] # re-raise error on every call - if isinstance(self._result, RemoteActorError): - raise self._result + if isinstance(result, RemoteActorError): + raise result - return self._result + return result async def _cancel_streams(self): # terminate all locally running async generator @@ -244,9 +249,10 @@ class Portal: instance methods in the remote runtime. Currently this should only be used for `tractor` internals. """ - return await self._return_once( + value, _ = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) + return value async def run( self, @@ -254,12 +260,14 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, in + ''' + Submit a remote function to be scheduled and run by actor, in a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. - """ + + ''' if isinstance(func, str): warnings.warn( "`Portal.run(namespace: str, funcname: str)` is now" @@ -285,9 +293,9 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - return await self._return_once( + return (await self._return_once( *(await self._submit(fn_mod_path, fn_name, kwargs)) - ) + ))[0] @asynccontextmanager async def open_stream_from(