diff --git a/tractor/_actor.py b/tractor/_actor.py index a9ce438..dbc34b0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -2,6 +2,7 @@ Actor primitives and helpers """ +from __future__ import annotations from collections import defaultdict from functools import partial from itertools import chain @@ -57,6 +58,8 @@ async def _invoke( ''' Invoke local func and deliver result(s) over provided channel. + This is the core "RPC task" starting machinery. + ''' __tracebackhide__ = True treat_as_gen = False @@ -263,14 +266,51 @@ def _get_mod_abspath(module): _lifetime_stack: ExitStack = ExitStack() -class Actor: - """The fundamental concurrency primitive. +async def try_ship_error_to_parent( + actor: Actor, + err: Exception, - An *actor* is the combination of a regular Python process - executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around various IPC transport "channels". - """ +) -> None: + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await actor._parent_chan.send(pack_error(err)) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + log.error( + f"Failed to ship error to parent " + f"{actor._parent_chan.uid}, channel was closed" + ) + + +class Actor: + ''' + The fundamental "runtime" concurrency primitive. + + An *actor* is the combination of a regular Python process executing + a ``trio`` task tree, communicating with other actors through + "memory boundary portals" - which provide a native async API around + IPC transport "channels" which themselves encapsulate various + (swappable) network protocols. + + + Each "actor" is ``trio.run()`` scheduled "runtime" composed of many + concurrent tasks in a single thread. The "runtime" tasks conduct + a slew of low(er) level functions to make it possible for message + passing between actors as well as the ability to create new actors + (aka new "runtimes" in new processes which are supervised via + a nursery construct). Each task which sends messages to a task in + a "peer" (not necessarily a parent-child, depth hierarchy)) is able + to do so via an "address", which maps IPC connections across memory + boundaries, and task request id which allows for per-actor + tasks to send and receive messages to specific peer-actor tasks with + which there is an ongoing RPC/IPC dialog. + + ''' + # ugh, we need to get rid of this and replace with a "registry" sys + # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork @@ -441,8 +481,8 @@ class Actor: # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. # trio.BrokenResourceError, - # trio.ClosedResourceError, + TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -482,7 +522,49 @@ class Actor: # process received reponses. try: await self._process_messages(chan) + + except trio.Cancelled: + log.cancel(f"Msg loop was cancelled for {chan}") + raise + finally: + # This is set in ``Portal.cancel_actor()``. So if + # the peer was cancelled we try to wait for them + # to tear down their side of the connection before + # moving on with closing our own side. + local_nursery = self._actoruid2nursery.get(chan.uid) + if ( + local_nursery + ): + log.cancel(f"Waiting on cancel request to peer {chan.uid}") + # XXX: this is a soft wait on the channel (and its + # underlying transport protocol) to close from the remote + # peer side since we presume that any channel which + # is mapped to a sub-actor (i.e. it's managed by + # one of our local nurseries) + # message is sent to the peer likely by this actor which is + # now in a cancelled condition) when the local runtime here + # is now cancelled while (presumably) in the middle of msg + # loop processing. + with trio.move_on_after(0.1) as cs: + cs.shield = True + # Attempt to wait for the far end to close the channel + # and bail after timeout (2-generals on closure). + async for msg in chan.msgstream.drain(): + # try to deliver any lingering msgs + # before we destroy the channel. + # This accomplishes deterministic + # ``Portal.cancel_actor()`` cancellation by + # making sure any RPC response to that call is + # delivered the local calling task. + # TODO: factor this into a helper? + log.runtime(f'drained {msg} for {chan.uid}') + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await self._push_result(chan, cid, msg) + + await local_nursery.exited.wait() # channel cleanup sequence @@ -593,10 +675,12 @@ class Actor: func: str, kwargs: dict ) -> Tuple[str, trio.abc.ReceiveChannel]: - """Send a ``'cmd'`` message to a remote actor and return a + ''' + Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. - """ + + ''' cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) @@ -609,11 +693,14 @@ class Actor: chan: Channel, shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Process messages for the channel async-RPC style. + ''' + Process messages for the channel async-RPC style. Receive multiplexed RPC requests and deliver responses over ``chan``. - """ + + ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None @@ -692,8 +779,9 @@ class Actor: # msg loop and break out into # ``_async_main()`` log.cancel( - f"Actor {self.uid} was remotely cancelled;" - " waiting on cancellation completion..") + f"Actor {self.uid} was remotely cancelled " + f"by {chan.uid}" + ) await _invoke( self, cid, chan, func, kwargs, is_rpc=False ) @@ -789,17 +877,12 @@ class Actor: # machinery not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send(pack_error(err)) + await try_ship_error_to_parent(self, err) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" raise - except trio.Cancelled: - # debugging only - log.runtime(f"Msg loop was cancelled for {chan}") - raise - finally: # msg debugging for when he machinery is brokey log.runtime( @@ -891,6 +974,7 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) @@ -994,14 +1078,7 @@ class Actor: ) if self._parent_chan: - with trio.CancelScope(shield=True): - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") + await try_ship_error_to_parent(self, err) # always! log.exception("Actor errored:")