diff --git a/tractor/_actor.py b/tractor/_actor.py index 3ea99ab..36f0092 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -616,6 +616,7 @@ class Actor: assert chan.uid, f"`chan.uid` can't be {chan.uid}" ctx = self._contexts[(chan.uid, cid)] send_chan = ctx._send_chan + assert send_chan # TODO: relaying far end context errors to the local # context through nursery raising? @@ -655,9 +656,13 @@ class Actor: ''' log.runtime(f"Getting result queue for {chan.uid} cid {cid}") + actor_uid = chan.uid + assert actor_uid try: - ctx = self._contexts[(chan.uid, cid)] + ctx = self._contexts[(actor_uid, cid)] except KeyError: + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) ctx = Context( chan, @@ -665,23 +670,25 @@ class Actor: _send_chan=send_chan, _recv_chan=recv_chan, ) - self._contexts[(chan.uid, cid)] = ctx + self._contexts[(actor_uid, cid)] = ctx return ctx - async def send_cmd( + async def start_remote_task( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.abc.MemoryReceiveChannel]: + ) -> Context: ''' - Send a ``'cmd'`` message to a remote actor and return a caller - id and a ``trio.MemoryReceiveChannel`` message "feeder" channel - that can be used to wait for responses delivered by the local - runtime's message processing loop. + Send a ``'cmd'`` message to a remote actor, which starts + a remote task-as-function entrypoint. + + Synchronously validates the endpoint type and return a caller + side task ``Context`` that can be used to wait for responses + delivered by the local runtime's message processing loop. ''' cid = str(uuid.uuid4()) @@ -689,7 +696,20 @@ class Actor: ctx = self.get_context(chan, cid) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return ctx.cid, ctx._recv_chan + + # Wait on first response msg and validate; this should be + # immediate. + first_msg = await ctx._recv_chan.receive() + functype = first_msg.get('functype') + + if 'error' in first_msg: + raise unpack_error(first_msg, chan) + + elif functype not in ('asyncfunc', 'asyncgen', 'context'): + raise ValueError(f"{first_msg} is an invalid response packet?") + + ctx._remote_func_type = functype + return ctx async def _process_messages( self, diff --git a/tractor/_portal.py b/tractor/_portal.py index 89bd3a9..dcd2ed5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -6,7 +6,7 @@ concurrency linked tasks running in disparate memory domains. import importlib import inspect from typing import ( - Tuple, Any, Dict, Optional, Set, + Any, Optional, Callable, AsyncGenerator ) from functools import partial @@ -49,7 +49,7 @@ async def maybe_open_nursery( yield nursery -def func_deats(func: Callable) -> Tuple[str, str]: +def func_deats(func: Callable) -> tuple[str, str]: return ( func.__module__, func.__name__, @@ -98,72 +98,45 @@ class Portal: # during the portal's lifetime self._result_msg: Optional[dict] = None - # When this is set to a tuple returned from ``_submit()`` then + # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some - # point. Set when _submit_for_result is called - self._expect_result: Optional[ - Tuple[str, Any, str, Dict[str, Any]] - ] = None - self._streams: Set[ReceiveMsgStream] = set() + # point. + self._expect_result: Optional[Context] = None + self._streams: set[ReceiveMsgStream] = set() self.actor = current_actor() - async def _submit( + async def _submit_for_result( self, - ns: str, func: str, - kwargs, - - ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: - ''' - Submit a function to be scheduled and run by actor, return the - associated caller id, response queue, response type str, first - message packet as a tuple. - - This is an async call. - - ''' - # ship a function call request to the remote actor - cid, recv_chan = await self.actor.send_cmd( - self.channel, ns, func, kwargs) - - # wait on first response msg and handle (this should be - # in an immediate response) - - first_msg = await recv_chan.receive() - functype = first_msg.get('functype') - - if 'error' in first_msg: - raise unpack_error(first_msg, self.channel) - - elif functype not in ('asyncfunc', 'asyncgen', 'context'): - raise ValueError(f"{first_msg} is an invalid response packet?") - - return cid, recv_chan, functype, first_msg - - async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: + **kwargs + ) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, kwargs) + self._expect_result = await self.actor.start_remote_task( + self.channel, + ns, + func, + kwargs + ) async def _return_once( self, - cid: str, - recv_chan: trio.abc.ReceiveChannel, - resptype: str, - first_msg: dict + ctx: Context, ) -> dict[str, Any]: - assert resptype == 'asyncfunc' # single response - msg = await recv_chan.receive() + assert ctx._remote_func_type == 'asyncfunc' # single response + msg = await ctx._recv_chan.receive() return msg async def result(self) -> Any: - """Return the result(s) from the remote actor's "main" task. - """ + ''' + Return the result(s) from the remote actor's "main" task. + + ''' # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -182,7 +155,9 @@ class Portal: assert self._expect_result if self._result_msg is None: - self._result_msg = await self._return_once(*self._expect_result) + self._result_msg = await self._return_once( + self._expect_result + ) return _unwrap_msg(self._result_msg, self.channel) @@ -275,7 +250,12 @@ class Portal: ''' msg = await self._return_once( - *(await self._submit(namespace_path, function_name, kwargs)) + await self.actor.start_remote_task( + self.channel, + namespace_path, + function_name, + kwargs, + ) ) return _unwrap_msg(msg, self.channel) @@ -320,7 +300,12 @@ class Portal: return _unwrap_msg( await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)), + await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs, + ) ), self.channel, ) @@ -342,27 +327,21 @@ class Portal: f'{async_gen_func} must be an async generator function!') fn_mod_path, fn_name = func_deats(async_gen_func) - ( - cid, - recv_chan, - functype, - first_msg - ) = await self._submit(fn_mod_path, fn_name, kwargs) - - # receive only stream - assert functype == 'asyncgen' - - ctx = Context( + ctx = await self.actor.start_remote_task( self.channel, - cid, - # do we need this to be closed implicitly? - # _recv_chan=recv_chan, - _portal=self + fn_mod_path, + fn_name, + kwargs ) + ctx._portal = self + + # ensure receive-only stream entrypoint + assert ctx._remote_func_type == 'asyncgen' + try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, + ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -396,7 +375,7 @@ class Portal: func: Callable, **kwargs, - ) -> AsyncGenerator[Tuple[Context, Any], None]: + ) -> AsyncGenerator[tuple[Context, Any], None]: ''' Open an inter-actor task context. @@ -415,14 +394,14 @@ class Portal: f'{func} must be an async generator function!') fn_mod_path, fn_name = func_deats(func) - - recv_chan: Optional[trio.MemoryReceiveChannel] = None - - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) - - assert functype == 'context' - msg = await recv_chan.receive() + ctx = await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs + ) + assert ctx._remote_func_type == 'context' + msg = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's @@ -439,12 +418,6 @@ class Portal: raise _err: Optional[BaseException] = None - - # this should already have been created from the - # ``._submit()`` call above. - ctx = self.actor.get_context(self.channel, cid) - # pairs with handling in ``Actor._push_result()`` - assert ctx._recv_chan is recv_chan ctx._portal = self # deliver context instance and .started() msg value in open tuple. @@ -502,9 +475,9 @@ class Portal: # operating *in* this scope to have survived # we tear down the runtime feeder chan last # to avoid premature stream clobbers. - if recv_chan is not None: + if ctx._recv_chan is not None: # should we encapsulate this in the context api? - await recv_chan.aclose() + await ctx._recv_chan.aclose() if _err: if ctx._cancel_called: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 29be554..79cc956 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -322,13 +322,14 @@ class Context: # these are the "feeder" channels for delivering # message values to the local task from the runtime # msg processing loop. - _recv_chan: Optional[trio.MemoryReceiveChannel] = None - _send_chan: Optional[trio.MemorySendChannel] = None + _recv_chan: trio.MemoryReceiveChannel + _send_chan: trio.MemorySendChannel + + _remote_func_type: Optional[str] = None # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa _result: Optional[Any] = False - _remote_func_type: str = None # status flags _cancel_called: bool = False @@ -474,7 +475,7 @@ class Context: ) # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try + # caller side inside `Actor.start_remote_task()` so if you try # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently.