diff --git a/tractor/_actor.py b/tractor/_actor.py index 36f0092..b78b872 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -32,6 +32,7 @@ from ._exceptions import ( is_multi_cancelled, ContextCancelled, TransportClosed, + StreamOverrun, ) from . import _debug from ._discovery import get_arbiter @@ -161,16 +162,27 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - cs = scope_nursery.cancel_scope - task_status.started(cs) - try: + try: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope + task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except trio.Cancelled as err: - tb = err.__traceback__ - if cs.cancelled_caught: + except trio.Cancelled as err: + tb = err.__traceback__ + if ctx._error is not None: + tb = ctx._error.__traceback__ + raise ctx._error + + except trio.MultiError as err: + if ctx._error is not None: + tb = ctx._error.__traceback__ + raise ctx._error from err + else: + raise + + if cs.cancelled_caught or ctx._error: # TODO: pack in ``trio.Cancelled.__traceback__`` here # so they can be unwrapped and displayed on the caller @@ -314,6 +326,7 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `_async_main()` after fork _root_n: Optional[trio.Nursery] = None @@ -548,7 +561,7 @@ class Actor: # now in a cancelled condition) when the local runtime here # is now cancelled while (presumably) in the middle of msg # loop processing. - with trio.move_on_after(0.1) as cs: + with trio.move_on_after(0.5) as cs: cs.shield = True # Attempt to wait for the far end to close the channel # and bail after timeout (2-generals on closure). @@ -611,23 +624,54 @@ class Actor: cid: str, msg: dict[str, Any], ) -> None: - """Push an RPC result to the local consumer's queue. - """ + ''' + Push an RPC result to the local consumer's queue. + + ''' assert chan.uid, f"`chan.uid` can't be {chan.uid}" ctx = self._contexts[(chan.uid, cid)] send_chan = ctx._send_chan assert send_chan - # TODO: relaying far end context errors to the local - # context through nursery raising? - # if 'error' in msg: - # ctx._error_from_remote_msg(msg) - # log.runtime(f"{send_chan} was terminated at remote end") + if msg.get('error'): + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + log.warning(f'Remote context for {chan.uid}:{cid} errored') + ctx._maybe_error_from_remote_msg(msg) try: log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") - # maintain backpressure - await send_chan.send(msg) + + # XXX: we do **not** maintain backpressure and instead + # opt to relay stream overrun errors to the sender. + try: + send_chan.send_nowait(msg) + except trio.WouldBlock: + log.warning(f'Caller task {cid} was overrun!?') + if ctx._backpressure: + await send_chan.send(msg) + else: + try: + raise StreamOverrun( + f'Context stream {cid} for {chan.uid} was overrun!' + ) + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) except trio.BrokenResourceError: # TODO: what is the right way to handle the case where the @@ -644,7 +688,7 @@ class Actor: self, chan: Channel, cid: str, - max_buffer_size: int = 2**6, + msg_buffer_size: Optional[int] = None, ) -> Context: ''' @@ -660,10 +704,17 @@ class Actor: assert actor_uid try: ctx = self._contexts[(actor_uid, cid)] + + # adjust buffer size if specified + state = ctx._send_chan._state + if msg_buffer_size and state.max_buffer_size != msg_buffer_size: + state.max_buffer_size = msg_buffer_size + except KeyError: send_chan: trio.MemorySendChannel recv_chan: trio.MemoryReceiveChannel - send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) + send_chan, recv_chan = trio.open_memory_channel( + msg_buffer_size or self.msg_buffer_size) ctx = Context( chan, cid, @@ -679,7 +730,8 @@ class Actor: chan: Channel, ns: str, func: str, - kwargs: dict + kwargs: dict, + msg_buffer_size: Optional[int] = None, ) -> Context: ''' @@ -693,7 +745,7 @@ class Actor: ''' cid = str(uuid.uuid4()) assert chan.uid - ctx = self.get_context(chan, cid) + ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) @@ -743,7 +795,8 @@ class Actor: if msg is None: # loop terminate sentinel log.cancel( - f"Cancelling all tasks for {chan} from {chan.uid}") + f"Channerl to {chan.uid} terminated?\n" + "Cancelling all associated tasks..") for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: