From f32ccd76aaeca6eb791d2544d6844f4fd46f8c2c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 12:51:29 -0500 Subject: [PATCH 1/6] Add `Portal.result()` is None test case This demonstrates a bug where if the remote `.run_in_actor()` task returns `None` then multiple calls to `Portal.result()` will hang forever... --- tests/test_spawning.py | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index b1110fe..ba3f6f9 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,7 +1,7 @@ """ Spawning basics """ -from typing import Dict, Tuple +from typing import Dict, Tuple, Optional import pytest import trio @@ -93,24 +93,38 @@ async def test_movie_theatre_convo(start_method): await portal.cancel_actor() -async def cellar_door(): - return "Dang that's beautiful" +async def cellar_door(return_value: Optional[str]): + return return_value +@pytest.mark.parametrize( + 'return_value', ["Dang that's beautiful", None], + ids=['return_str', 'return_None'], +) @tractor_test -async def test_most_beautiful_word(start_method): - """The main ``tractor`` routine. - """ - async with tractor.open_nursery() as n: +async def test_most_beautiful_word( + start_method, + return_value +): + ''' + The main ``tractor`` routine. - portal = await n.run_in_actor( - cellar_door, - name='some_linguist', - ) + ''' + with trio.fail_after(0.5): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + cellar_door, + return_value=return_value, + name='some_linguist', + ) + + print(await portal.result()) # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. + # this should pull the cached final result already captured during + # the nursery block exit. print(await portal.result()) From 095c94b1d2b2192c524a63df285a424f72155712 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 12:56:38 -0500 Subject: [PATCH 2/6] Fix `Portal.run_in_actor()` returns `None` bug Fixes the issue where if the main remote task returns `None`, `Portal.result()` would erroneously wait again on the underlying feeder mem chan since `None` was being used as the cache flag. Instead set the flag as the channel uid and consider the result collected when set to anything else (since it would be odd to return that value from a remote task when you already can read it as part of portal/channel apis). --- tractor/_portal.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 382f8e4..e4e11b9 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -77,7 +77,7 @@ class Portal: # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result: Optional[Any] = None + self._result: Optional[Any] = channel.uid # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] @@ -128,6 +128,7 @@ class Portal: recv_chan: trio.abc.ReceiveChannel, resptype: str, first_msg: dict + ) -> Any: assert resptype == 'asyncfunc' # single response @@ -158,7 +159,7 @@ class Portal: # expecting a "main" result assert self._expect_result - if self._result is None: + if self._result is self.channel.uid: try: self._result = await self._return_once(*self._expect_result) except RemoteActorError as err: From 57e98b25e70900aeaf57ce8fb1c61d244da25a72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 13:08:19 -0500 Subject: [PATCH 3/6] Increase timeout, windows... --- tests/test_spawning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index ba3f6f9..cdeacba 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -110,7 +110,7 @@ async def test_most_beautiful_word( The main ``tractor`` routine. ''' - with trio.fail_after(0.5): + with trio.fail_after(1): async with tractor.open_nursery() as n: portal = await n.run_in_actor( From 83da92d4cb0807301c31575272e3398a196ad548 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Nov 2021 19:16:47 -0500 Subject: [PATCH 4/6] Add nooz --- newsfragments/264.bug.rst | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 newsfragments/264.bug.rst diff --git a/newsfragments/264.bug.rst b/newsfragments/264.bug.rst new file mode 100644 index 0000000..4272dab --- /dev/null +++ b/newsfragments/264.bug.rst @@ -0,0 +1,7 @@ +Fix ``Portal.run_in_actor()`` returns ``None`` result. + +``None`` was being used as the cached result flag and obviously breaks +on a ``None`` returned from the remote target task. This would cause an +infinite hang if user code ever called ``Portal.result()`` *before* the +nursery exit. The simple fix is to use the ``Channel.uid`` as the +initial "no-result-received-yet" flag. From 0e7234aa689bce3ff5d59269418050cef746ab07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Nov 2021 07:27:14 -0500 Subject: [PATCH 5/6] Cache the return message instead of the value Thanks to @richardsheridan for pointing out the limitations of using *any* kind of value as the result-cached-flag and how it might cause problems for anyone returning pickled blob-data. This changes the `Portal` internal result value tracking to stash the full message from which the value can be retrieved by any `Portal.result()` caller. The internal change is that `Portal._return_once()` now returns a tuple of the message *and* its value. --- newsfragments/264.bug.rst | 5 +++-- tractor/_portal.py | 36 ++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/newsfragments/264.bug.rst b/newsfragments/264.bug.rst index 4272dab..dda3389 100644 --- a/newsfragments/264.bug.rst +++ b/newsfragments/264.bug.rst @@ -3,5 +3,6 @@ Fix ``Portal.run_in_actor()`` returns ``None`` result. ``None`` was being used as the cached result flag and obviously breaks on a ``None`` returned from the remote target task. This would cause an infinite hang if user code ever called ``Portal.result()`` *before* the -nursery exit. The simple fix is to use the ``Channel.uid`` as the -initial "no-result-received-yet" flag. +nursery exit. The simple fix is to use the *return message* as the +initial "no-result-received-yet" flag value and, once received, the +return value is read from the message to avoid the cache logic error. diff --git a/tractor/_portal.py b/tractor/_portal.py index e4e11b9..c2e7315 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -77,7 +77,8 @@ class Portal: # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result: Optional[Any] = channel.uid + self._result_msg: Optional[dict] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] @@ -129,12 +130,12 @@ class Portal: resptype: str, first_msg: dict - ) -> Any: + ) -> tuple[Any, dict[str, Any]]: assert resptype == 'asyncfunc' # single response msg = await recv_chan.receive() try: - return msg['return'] + return msg['return'], msg except KeyError: # internal error should never get here assert msg.get('cid'), "Received internal error at portal?" @@ -159,17 +160,21 @@ class Portal: # expecting a "main" result assert self._expect_result - if self._result is self.channel.uid: + + if self._result_msg is None: try: - self._result = await self._return_once(*self._expect_result) + result, self._result_msg = await self._return_once( + *self._expect_result) except RemoteActorError as err: - self._result = err + result = err + else: + result = self._result_msg['return'] # re-raise error on every call - if isinstance(self._result, RemoteActorError): - raise self._result + if isinstance(result, RemoteActorError): + raise result - return self._result + return result async def _cancel_streams(self): # terminate all locally running async generator @@ -244,9 +249,10 @@ class Portal: instance methods in the remote runtime. Currently this should only be used for `tractor` internals. """ - return await self._return_once( + value, _ = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) + return value async def run( self, @@ -254,12 +260,14 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, in + ''' + Submit a remote function to be scheduled and run by actor, in a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. - """ + + ''' if isinstance(func, str): warnings.warn( "`Portal.run(namespace: str, funcname: str)` is now" @@ -285,9 +293,9 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - return await self._return_once( + return (await self._return_once( *(await self._submit(fn_mod_path, fn_name, kwargs)) - ) + ))[0] @asynccontextmanager async def open_stream_from( From f6de7e0afdff6aecbd4db77ee09c90fe7c58d4db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Nov 2021 08:40:59 -0500 Subject: [PATCH 6/6] Factor out msg unwrapping into a func --- tractor/_portal.py | 59 ++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c2e7315..80fc902 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,7 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - RemoteActorError, + # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -54,8 +54,23 @@ def func_deats(func: Callable) -> Tuple[str, str]: ) +def _unwrap_msg( + + msg: dict[str, Any], + channel: Channel + +) -> Any: + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, channel) + + class Portal: - """A 'portal' to a(n) (remote) ``Actor``. + ''' + A 'portal' to a(n) (remote) ``Actor``. A portal is "opened" (and eventually closed) by one side of an inter-actor communication context. The side which opens the portal @@ -71,7 +86,7 @@ class Portal: function calling semantics are supported transparently; hence it is like having a "portal" between the seperate actor memory spaces. - """ + ''' def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then @@ -130,16 +145,11 @@ class Portal: resptype: str, first_msg: dict - ) -> tuple[Any, dict[str, Any]]: - assert resptype == 'asyncfunc' # single response + ) -> dict[str, Any]: + assert resptype == 'asyncfunc' # single response msg = await recv_chan.receive() - try: - return msg['return'], msg - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) + return msg async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -162,19 +172,9 @@ class Portal: assert self._expect_result if self._result_msg is None: - try: - result, self._result_msg = await self._return_once( - *self._expect_result) - except RemoteActorError as err: - result = err - else: - result = self._result_msg['return'] + self._result_msg = await self._return_once(*self._expect_result) - # re-raise error on every call - if isinstance(result, RemoteActorError): - raise result - - return result + return _unwrap_msg(self._result_msg, self.channel) async def _cancel_streams(self): # terminate all locally running async generator @@ -249,10 +249,10 @@ class Portal: instance methods in the remote runtime. Currently this should only be used for `tractor` internals. """ - value, _ = await self._return_once( + msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) - return value + return _unwrap_msg(msg, self.channel) async def run( self, @@ -293,9 +293,12 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - return (await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)) - ))[0] + return _unwrap_msg( + await self._return_once( + *(await self._submit(fn_mod_path, fn_name, kwargs)), + ), + self.channel, + ) @asynccontextmanager async def open_stream_from(