forked from goodboy/tractor
1
0
Fork 0

Merge pull request #264 from goodboy/runinactor_none_result

Fix `Portal.run_in_actor()` returns `None` result
faster_daemon_cancels
goodboy 2021-11-29 09:21:24 -05:00 committed by GitHub
commit ac821bdd94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 37 deletions

View File

@ -0,0 +1,8 @@
Fix ``Portal.run_in_actor()`` returns ``None`` result.
``None`` was being used as the cached result flag and obviously breaks
on a ``None`` returned from the remote target task. This would cause an
infinite hang if user code ever called ``Portal.result()`` *before* the
nursery exit. The simple fix is to use the *return message* as the
initial "no-result-received-yet" flag value and, once received, the
return value is read from the message to avoid the cache logic error.

View File

@ -1,7 +1,7 @@
""" """
Spawning basics Spawning basics
""" """
from typing import Dict, Tuple from typing import Dict, Tuple, Optional
import pytest import pytest
import trio import trio
@ -93,24 +93,38 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor() await portal.cancel_actor()
async def cellar_door(): async def cellar_door(return_value: Optional[str]):
return "Dang that's beautiful" return return_value
@pytest.mark.parametrize(
'return_value', ["Dang that's beautiful", None],
ids=['return_str', 'return_None'],
)
@tractor_test @tractor_test
async def test_most_beautiful_word(start_method): async def test_most_beautiful_word(
"""The main ``tractor`` routine. start_method,
""" return_value
async with tractor.open_nursery() as n: ):
'''
The main ``tractor`` routine.
portal = await n.run_in_actor( '''
cellar_door, with trio.fail_after(1):
name='some_linguist', async with tractor.open_nursery() as n:
)
portal = await n.run_in_actor(
cellar_door,
return_value=return_value,
name='some_linguist',
)
print(await portal.result())
# The ``async with`` will unblock here since the 'some_linguist' # The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``. # actor has completed its main task ``cellar_door``.
# this should pull the cached final result already captured during
# the nursery block exit.
print(await portal.result()) print(await portal.result())

View File

@ -21,7 +21,7 @@ from .log import get_logger
from ._exceptions import ( from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
RemoteActorError, # RemoteActorError,
ContextCancelled, ContextCancelled,
) )
from ._streaming import Context, ReceiveMsgStream from ._streaming import Context, ReceiveMsgStream
@ -54,8 +54,23 @@ def func_deats(func: Callable) -> Tuple[str, str]:
) )
def _unwrap_msg(
msg: dict[str, Any],
channel: Channel
) -> Any:
try:
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)
class Portal: class Portal:
"""A 'portal' to a(n) (remote) ``Actor``. '''
A 'portal' to a(n) (remote) ``Actor``.
A portal is "opened" (and eventually closed) by one side of an A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal inter-actor communication context. The side which opens the portal
@ -71,13 +86,14 @@ class Portal:
function calling semantics are supported transparently; hence it is function calling semantics are supported transparently; hence it is
like having a "portal" between the seperate actor memory spaces. like having a "portal" between the seperate actor memory spaces.
""" '''
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then # when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result: Optional[Any] = None self._result_msg: Optional[dict] = None
# set when _submit_for_result is called # set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
@ -128,16 +144,12 @@ class Portal:
recv_chan: trio.abc.ReceiveChannel, recv_chan: trio.abc.ReceiveChannel,
resptype: str, resptype: str,
first_msg: dict first_msg: dict
) -> Any:
assert resptype == 'asyncfunc' # single response
) -> dict[str, Any]:
assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await recv_chan.receive()
try: return msg
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, self.channel)
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
@ -158,17 +170,11 @@ class Portal:
# expecting a "main" result # expecting a "main" result
assert self._expect_result assert self._expect_result
if self._result is None:
try:
self._result = await self._return_once(*self._expect_result)
except RemoteActorError as err:
self._result = err
# re-raise error on every call if self._result_msg is None:
if isinstance(self._result, RemoteActorError): self._result_msg = await self._return_once(*self._expect_result)
raise self._result
return self._result return _unwrap_msg(self._result_msg, self.channel)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -243,9 +249,10 @@ class Portal:
instance methods in the remote runtime. Currently this should only instance methods in the remote runtime. Currently this should only
be used for `tractor` internals. be used for `tractor` internals.
""" """
return await self._return_once( msg = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs)) *(await self._submit(namespace_path, function_name, kwargs))
) )
return _unwrap_msg(msg, self.channel)
async def run( async def run(
self, self,
@ -253,12 +260,14 @@ class Portal:
fn_name: Optional[str] = None, fn_name: Optional[str] = None,
**kwargs **kwargs
) -> Any: ) -> Any:
"""Submit a remote function to be scheduled and run by actor, in '''
Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s). a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.
"""
'''
if isinstance(func, str): if isinstance(func, str):
warnings.warn( warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now" "`Portal.run(namespace: str, funcname: str)` is now"
@ -284,8 +293,11 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
return await self._return_once( return _unwrap_msg(
*(await self._submit(fn_mod_path, fn_name, kwargs)) await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs)),
),
self.channel,
) )
@asynccontextmanager @asynccontextmanager