forked from goodboy/tractor
1
0
Fork 0

Use `.recv_msg_w_pld()` for final `Portal.result()`

Woops, due to a `None` test against the `._final_result`, any actual
final `None` result would be received but not acked as such causing
a spawning test to hang. Fix it by instead receiving and assigning both
a `._final_result_msg: PayloadMsg` and `._final_result_pld`.

NB: as mentioned in many recent comments surrounding this API layer,
really this whole `Portal`-has-final-result interface/semantics should
be entirely removed as should the `ActorNursery.run_in_actor()` API(s).
Instead it should all be replaced by a wrapping "high level" API
(`tractor.hilevel` ?) which combines a task nursery, `Portal.open_context()`
and underlying `Context` APIs + an `outcome.Outcome` to accomplish the
same "run a single task in a spawned actor and return it's result"; aka
a "one-shot-task-actor".
runtime_to_msgspec
Tyler Goodlet 2024-05-09 09:37:47 -04:00
parent c5a0cfc639
commit d6ca4771ce
1 changed files with 11 additions and 6 deletions

View File

@ -47,6 +47,7 @@ from ._ipc import Channel
from .log import get_logger
from .msg import (
# Error,
PayloadMsg,
NamespacePath,
Return,
)
@ -98,7 +99,8 @@ class Portal:
self.chan = channel
# during the portal's lifetime
self._final_result: Any|None = None
self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
@ -132,7 +134,7 @@ class Portal:
'A pending main result has already been submitted'
)
self._expect_result_ctx = await self.actor.start_remote_task(
self._expect_result_ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs,
@ -163,13 +165,16 @@ class Portal:
# expecting a "main" result
assert self._expect_result_ctx
if self._final_result is None:
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
ctx=self._expect_result_ctx,
if self._final_result_msg is None:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
expect_msg=Return,
)
return self._final_result
return self._final_result_pld
async def _cancel_streams(self):
# terminate all locally running async generator