forked from goodboy/tractor
1
0
Fork 0

Cache the return message instead of the value

Thanks to @richardsheridan for pointing out the limitations of using
*any* kind of value as the result-cached-flag and how it might cause
problems for anyone returning pickled blob-data. This changes the
`Portal` internal result value tracking to stash the full message from
which the value can be retrieved by any `Portal.result()` caller.
The internal change is that `Portal._return_once()` now returns a tuple
of the message *and* its value.
faster_daemon_cancels
Tyler Goodlet 2021-11-29 07:27:14 -05:00
parent 83da92d4cb
commit 0e7234aa68
2 changed files with 25 additions and 16 deletions

View File

@ -3,5 +3,6 @@ Fix ``Portal.run_in_actor()`` returns ``None`` result.
``None`` was being used as the cached result flag and obviously breaks ``None`` was being used as the cached result flag and obviously breaks
on a ``None`` returned from the remote target task. This would cause an on a ``None`` returned from the remote target task. This would cause an
infinite hang if user code ever called ``Portal.result()`` *before* the infinite hang if user code ever called ``Portal.result()`` *before* the
nursery exit. The simple fix is to use the ``Channel.uid`` as the nursery exit. The simple fix is to use the *return message* as the
initial "no-result-received-yet" flag. initial "no-result-received-yet" flag value and, once received, the
return value is read from the message to avoid the cache logic error.

View File

@ -77,7 +77,8 @@ class Portal:
# when this is set to a tuple returned from ``_submit()`` then # when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result: Optional[Any] = channel.uid self._result_msg: Optional[dict] = None
# set when _submit_for_result is called # set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
@ -129,12 +130,12 @@ class Portal:
resptype: str, resptype: str,
first_msg: dict first_msg: dict
) -> Any: ) -> tuple[Any, dict[str, Any]]:
assert resptype == 'asyncfunc' # single response assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await recv_chan.receive()
try: try:
return msg['return'] return msg['return'], msg
except KeyError: except KeyError:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), "Received internal error at portal?" assert msg.get('cid'), "Received internal error at portal?"
@ -159,17 +160,21 @@ class Portal:
# expecting a "main" result # expecting a "main" result
assert self._expect_result assert self._expect_result
if self._result is self.channel.uid:
if self._result_msg is None:
try: try:
self._result = await self._return_once(*self._expect_result) result, self._result_msg = await self._return_once(
*self._expect_result)
except RemoteActorError as err: except RemoteActorError as err:
self._result = err result = err
else:
result = self._result_msg['return']
# re-raise error on every call # re-raise error on every call
if isinstance(self._result, RemoteActorError): if isinstance(result, RemoteActorError):
raise self._result raise result
return self._result return result
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -244,9 +249,10 @@ class Portal:
instance methods in the remote runtime. Currently this should only instance methods in the remote runtime. Currently this should only
be used for `tractor` internals. be used for `tractor` internals.
""" """
return await self._return_once( value, _ = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs)) *(await self._submit(namespace_path, function_name, kwargs))
) )
return value
async def run( async def run(
self, self,
@ -254,12 +260,14 @@ class Portal:
fn_name: Optional[str] = None, fn_name: Optional[str] = None,
**kwargs **kwargs
) -> Any: ) -> Any:
"""Submit a remote function to be scheduled and run by actor, in '''
Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s). a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.
"""
'''
if isinstance(func, str): if isinstance(func, str):
warnings.warn( warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now" "`Portal.run(namespace: str, funcname: str)` is now"
@ -285,9 +293,9 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
return await self._return_once( return (await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs)) *(await self._submit(fn_mod_path, fn_name, kwargs))
) ))[0]
@asynccontextmanager @asynccontextmanager
async def open_stream_from( async def open_stream_from(