diff --git a/tractor/_portal.py b/tractor/_portal.py index 17871aa..e61ac37 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -103,7 +103,7 @@ class Portal: # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some # point. - self._expect_result: Optional[Context] = None + self._expect_result: Context | None = None self._streams: set[MsgStream] = set() self.actor = current_actor() @@ -209,7 +209,10 @@ class Portal: try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - with trio.move_on_after(timeout or self.cancel_timeout) as cs: + with trio.move_on_after( + timeout + or self.cancel_timeout + ) as cs: cs.shield = True await self.run_from_ns('self', 'cancel') @@ -330,7 +333,9 @@ class Portal: f'{async_gen_func} must be an async generator function!') fn_mod_path, fn_name = NamespacePath.from_ref( - async_gen_func).to_tuple() + async_gen_func + ).to_tuple() + ctx = await self.actor.start_remote_task( self.channel, fn_mod_path, @@ -396,13 +401,16 @@ class Portal: raise TypeError( f'{func} must be an async generator function!') + # TODO: i think from here onward should probably + # just be factored into an `@acm` inside a new + # a new `_context.py` mod. fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() ctx = await self.actor.start_remote_task( self.channel, fn_mod_path, fn_name, - kwargs + kwargs, ) assert ctx._remote_func_type == 'context' @@ -426,29 +434,47 @@ class Portal: f' but received a non-error msg:\n{pformat(msg)}' ) - _err: Optional[BaseException] = None - ctx._portal = self + _err: BaseException | None = None + ctx._portal: Portal = self - uid = self.channel.uid - cid = ctx.cid - etype: Optional[Type[BaseException]] = None + uid: tuple = self.channel.uid + cid: str = ctx.cid + etype: Type[BaseException] | None = None - # deliver context instance and .started() msg value in open tuple. + # deliver context instance and .started() msg value in enter + # tuple. try: - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - - # do we need this? - # await trio.lowlevel.checkpoint() + async with trio.open_nursery() as nurse: + ctx._scope_nursery = nurse + ctx._scope = nurse.cancel_scope yield ctx, first + # when in allow_ovveruns mode there may be lingering + # overflow sender tasks remaining? + if nurse.child_tasks: + # ensure we are in overrun state with + # ``._allow_overruns=True`` bc otherwise + # there should be no tasks in this nursery! + if ( + not ctx._allow_overruns + or len(nurse.child_tasks) > 1 + ): + raise RuntimeError( + 'Context has sub-tasks but is ' + 'not in `allow_overruns=True` Mode!?' + ) + ctx._scope.cancel() + except ContextCancelled as err: _err = err + + # swallow and mask cross-actor task context cancels that + # were initiated by *this* side's task. if not ctx._cancel_called: - # context was cancelled at the far end but was - # not part of this end requesting that cancel - # so raise for the local task to respond and handle. + # XXX: this should NEVER happen! + # from ._debug import breakpoint + # await breakpoint() raise # if the context was cancelled by client code @@ -468,17 +494,17 @@ class Portal: ) as err: etype = type(err) - # the context cancels itself on any cancel - # causing error. - if ctx.chan.connected(): - log.cancel( - 'Context cancelled for task, sending cancel request..\n' - f'task:{cid}\n' - f'actor:{uid}' - ) + # cancel ourselves on any error. + log.cancel( + 'Context cancelled for task, sending cancel request..\n' + f'task:{cid}\n' + f'actor:{uid}' + ) + try: + await ctx.cancel() - else: + except trio.BrokenResourceError: log.warning( 'IPC connection for context is broken?\n' f'task:{cid}\n' @@ -487,12 +513,7 @@ class Portal: raise - finally: - # in the case where a runtime nursery (due to internal bug) - # or a remote actor transmits an error we want to be - # sure we get the error the underlying feeder mem chan. - # if it's not raised here it *should* be raised from the - # msg loop nursery right? + else: if ctx.chan.connected(): log.info( 'Waiting on final context-task result for\n' @@ -505,6 +526,7 @@ class Portal: f'value from callee `{result}`' ) + finally: # though it should be impossible for any tasks # operating *in* this scope to have survived # we tear down the runtime feeder chan last diff --git a/tractor/_root.py b/tractor/_root.py index 64652a1..a2d3158 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -254,7 +254,9 @@ async def open_root_actor( # tempn.start_soon(an.exited.wait) logger.cancel("Shutting down root actor") - await actor.cancel() + await actor.cancel( + requesting_uid=actor.uid, + ) finally: _state._current_actor = None diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 97f3025..9d9427d 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -28,9 +28,11 @@ import inspect import signal import sys from typing import ( - Any, Optional, - Union, TYPE_CHECKING, + Any, Callable, + Optional, + Union, + TYPE_CHECKING, ) import uuid from types import ModuleType @@ -44,7 +46,10 @@ import trio # type: ignore from trio_typing import TaskStatus from ._ipc import Channel -from ._streaming import Context +from ._streaming import ( + mk_context, + Context, +) from .log import get_logger from ._exceptions import ( pack_error, @@ -53,7 +58,6 @@ from ._exceptions import ( is_multi_cancelled, ContextCancelled, TransportClosed, - StreamOverrun, ) from . import _debug from ._discovery import get_arbiter @@ -79,7 +83,7 @@ async def _invoke( is_rpc: bool = True, task_status: TaskStatus[ - Union[trio.CancelScope, BaseException] + Union[Context, BaseException] ] = trio.TASK_STATUS_IGNORED, ): ''' @@ -99,7 +103,14 @@ async def _invoke( # activated cancel scope ref cs: Optional[trio.CancelScope] = None - ctx = actor.get_context(chan, cid) + ctx = actor.get_context( + chan, + cid, + # We shouldn't ever need to pass this through right? + # it's up to the soon-to-be called rpc task to + # open the stream with this option. + # allow_overruns=True, + ) context: bool = False if getattr(func, '_tractor_stream_function', False): @@ -138,7 +149,10 @@ async def _invoke( ): raise TypeError(f'{func} must be an async function!') - coro = func(**kwargs) + try: + coro = func(**kwargs) + except TypeError: + raise if inspect.isasyncgen(coro): await chan.send({'functype': 'asyncgen', 'cid': cid}) @@ -150,7 +164,8 @@ async def _invoke( # of the async gen in order to be sure the cancel # is propagated! with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) async with aclosing(coro) as agen: async for item in agen: # TODO: can we send values back in here? @@ -176,7 +191,8 @@ async def _invoke( # manualy construct the response dict-packet-responses as # above with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) await coro if not cs.cancelled_caught: @@ -189,19 +205,25 @@ async def _invoke( await chan.send({'functype': 'context', 'cid': cid}) try: - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - cs = scope_nursery.cancel_scope - task_status.started(cs) + with cancel_scope as cs: + ctx._scope = cs + task_status.started(ctx) res = await coro await chan.send({'return': res, 'cid': cid}) - except BaseExceptionGroup: + # XXX: do we ever trigger this block any more? + except ( + BaseExceptionGroup, + trio.Cancelled, + ): # if a context error was set then likely # thei multierror was raised due to that - if ctx._error is not None: - raise ctx._error from None + if ctx._remote_error is not None: + raise ctx._remote_error + # maybe TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side? raise finally: @@ -213,7 +235,6 @@ async def _invoke( # associated child isn't in debug any more await _debug.maybe_wait_for_debugger() ctx = actor._contexts.pop((chan.uid, cid)) - if ctx: log.runtime( f'Context entrypoint {func} was terminated:\n{ctx}' @@ -221,32 +242,60 @@ async def _invoke( assert cs if cs.cancelled_caught: - # if 'brokerd.kraken' in actor.uid: - # await _debug.breakpoint() - # TODO: pack in ``trio.Cancelled.__traceback__`` here - # so they can be unwrapped and displayed on the caller - # side! + # first check for and raise any remote error + # before raising any context cancelled case + # so that real remote errors don't get masked as + # ``ContextCancelled``s. + re = ctx._remote_error + if re: + ctx._maybe_raise_remote_err(re) fname = func.__name__ - if ctx._cancel_called: - msg = f'`{fname}()`@{actor.uid} cancelled itself' + if cs.cancel_called: + canceller = ctx._cancel_called_remote + # await _debug.breakpoint() - elif cs.cancel_called: - msg = ( - f'`{fname}()`@{actor.uid} was remotely cancelled by its caller ' - f'{ctx.chan.uid}' + # NOTE / TODO: if we end up having + # ``Actor._cancel_task()`` call + # ``Context.cancel()`` directly, we're going to + # need to change this logic branch since it will + # always enter.. + if ctx._cancel_called: + msg = f'`{fname}()`@{actor.uid} cancelled itself' + + else: + msg = ( + f'`{fname}()`@{actor.uid} ' + 'was remotely cancelled by ' + ) + + # if the channel which spawned the ctx is the + # one that cancelled it then we report that, vs. + # it being some other random actor that for ex. + # some actor who calls `Portal.cancel_actor()` + # and by side-effect cancels this ctx. + if canceller == ctx.chan.uid: + msg += f'its caller {canceller}' + else: + msg += f'remote actor {canceller}' + + # TODO: does this ever get set any more or can + # we remove it? + if ctx._cancel_msg: + msg += f' with msg:\n{ctx._cancel_msg}' + + # task-contex was either cancelled by request using + # ``Portal.cancel_actor()`` or ``Context.cancel()`` + # on the far end, or it was cancelled by the local + # (callee) task, so relay this cancel signal to the + # other side. + raise ContextCancelled( + msg, + suberror_type=trio.Cancelled, + canceller=canceller, ) - if ctx._cancel_msg: - msg += f' with msg:\n{ctx._cancel_msg}' - - # task-contex was cancelled so relay to the cancel to caller - raise ContextCancelled( - msg, - suberror_type=trio.Cancelled, - ) - else: # regular async function try: @@ -261,12 +310,17 @@ async def _invoke( ) with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) result = await coro - log.cancel(f'result: {result}') + fname = func.__name__ + log.runtime(f'{fname}() result: {result}') if not failed_resp: # only send result if we know IPC isn't down - await chan.send({'return': result, 'cid': cid}) + await chan.send( + {'return': result, + 'cid': cid} + ) except ( Exception, @@ -309,6 +363,7 @@ async def _invoke( # always ship errors back to caller err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid + try: await chan.send(err_msg) @@ -325,14 +380,21 @@ async def _invoke( f"Failed to ship error to caller @ {chan.uid} !?" ) + # error is probably from above coro running code *not from the + # underlyingn rpc invocation* since a scope was never allocated + # around actual coroutine await. if cs is None: - # error is from above code not from rpc invocation + # we don't ever raise directly here to allow the + # msg-loop-scheduler to continue running for this + # channel. task_status.started(err) finally: # RPC task bookeeping try: - scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) + ctx, func, is_complete = actor._rpc_tasks.pop( + (chan, cid) + ) is_complete.set() except KeyError: @@ -341,6 +403,9 @@ async def _invoke( # cancel scope will not have been inserted yet log.warning( f"Task {func} likely errored or cancelled before start") + else: + log.cancel(f'{func.__name__}({kwargs}) failed?') + finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -439,6 +504,7 @@ class Actor: self.uid = (name, uid or str(uuid.uuid4())) self._cancel_complete = trio.Event() + self._cancel_called_remote: tuple[str, tuple] | None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -477,7 +543,7 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ tuple[Channel, str], - tuple[trio.CancelScope, Callable, trio.Event] + tuple[Context, Callable, trio.Event] ] = {} # map {actor uids -> Context} @@ -652,8 +718,8 @@ class Actor: if ( local_nursery ): - - log.cancel(f"Waiting on cancel request to peer {chan.uid}") + if chan._cancel_called: + log.cancel(f"Waiting on cancel request to peer {chan.uid}") # XXX: this is a soft wait on the channel (and its # underlying transport protocol) to close from the # remote peer side since we presume that any channel @@ -786,76 +852,15 @@ class Actor: f'\n{msg}') return - send_chan = ctx._send_chan - - log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") - - # XXX: we do **not** maintain backpressure and instead - # opt to relay stream overrun errors to the sender. - try: - send_chan.send_nowait(msg) - # if an error is deteced we should always - # expect it to be raised by any context (stream) - # consumer task - await ctx._maybe_raise_from_remote_msg(msg) - - except trio.BrokenResourceError: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? - - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") - return - - except trio.WouldBlock: - # XXX: always push an error even if the local - # receiver is in overrun state. - await ctx._maybe_raise_from_remote_msg(msg) - - uid = chan.uid - lines = [ - 'Task context stream was overrun', - f'local task: {cid} @ {self.uid}', - f'remote sender: {uid}', - ] - if not ctx._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{self.uid[0]}` side! ***\n' - ) - - text = '\n'.join(lines) - - if ctx._backpressure: - log.warning(text) - try: - await send_chan.send(msg) - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{chan} is already closed") - else: - try: - raise StreamOverrun(text) from None - except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - try: - await chan.send(err_msg) - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{chan} is already closed") + return await ctx._deliver_msg(msg) def get_context( self, chan: Channel, cid: str, - msg_buffer_size: Optional[int] = None, + + msg_buffer_size: int | None = None, + allow_overruns: bool = False, ) -> Context: ''' @@ -871,6 +876,7 @@ class Actor: assert actor_uid try: ctx = self._contexts[(actor_uid, cid)] + ctx._allow_overruns = allow_overruns # adjust buffer size if specified state = ctx._send_chan._state # type: ignore @@ -878,15 +884,11 @@ class Actor: state.max_buffer_size = msg_buffer_size except KeyError: - send_chan: trio.MemorySendChannel - recv_chan: trio.MemoryReceiveChannel - send_chan, recv_chan = trio.open_memory_channel( - msg_buffer_size or self.msg_buffer_size) - ctx = Context( + ctx = mk_context( chan, cid, - _send_chan=send_chan, - _recv_chan=recv_chan, + msg_buffer_size=msg_buffer_size or self.msg_buffer_size, + _allow_overruns=allow_overruns, ) self._contexts[(actor_uid, cid)] = ctx @@ -898,7 +900,8 @@ class Actor: ns: str, func: str, kwargs: dict, - msg_buffer_size: Optional[int] = None, + msg_buffer_size: int | None = None, + allow_overruns: bool = False, ) -> Context: ''' @@ -916,6 +919,7 @@ class Actor: chan, cid, msg_buffer_size=msg_buffer_size, + allow_overruns=allow_overruns, ) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send( @@ -1046,7 +1050,11 @@ class Actor: assert self._service_n self._service_n.start_soon(self.cancel) - async def cancel(self) -> bool: + async def cancel( + self, + requesting_uid: tuple[str, str], + + ) -> bool: ''' Cancel this actor's runtime. @@ -1060,6 +1068,7 @@ class Actor: ''' log.cancel(f"{self.uid} is trying to cancel") + self._cancel_called_remote: tuple = requesting_uid self._cancel_called = True # cancel all ongoing rpc tasks @@ -1073,7 +1082,7 @@ class Actor: dbcs.cancel() # kill all ongoing tasks - await self.cancel_rpc_tasks() + await self.cancel_rpc_tasks(requesting_uid=requesting_uid) # stop channel server self.cancel_server() @@ -1099,7 +1108,13 @@ class Actor: # for n in root.child_nurseries: # n.cancel_scope.cancel() - async def _cancel_task(self, cid, chan): + async def _cancel_task( + self, + cid: str, + chan: Channel, + + requesting_uid: tuple[str, str] | None = None, + ) -> bool: ''' Cancel a local task by call-id / channel. @@ -1114,35 +1129,51 @@ class Actor: try: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(chan, cid)] + ctx, func, is_complete = self._rpc_tasks[(chan, cid)] + scope = ctx._scope except KeyError: log.cancel(f"{cid} has already completed/terminated?") - return + return True log.cancel( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") + if ( + ctx._cancel_called_remote is None + and requesting_uid + ): + ctx._cancel_called_remote: tuple = requesting_uid + # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: - return + return True + # TODO: shouldn't we eventually be calling ``Context.cancel()`` + # directly here instead (since that method can handle both + # side's calls into it? scope.cancel() # wait for _invoke to mark the task complete log.runtime( - f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" - f"peer: {chan.uid}\n") + 'Waiting on task to cancel:\n' + f'cid: {cid}\nfunc: {func}\n' + f'peer: {chan.uid}\n' + ) await is_complete.wait() log.runtime( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") + return True + async def cancel_rpc_tasks( self, only_chan: Channel | None = None, + requesting_uid: tuple[str, str] | None = None, + ) -> None: ''' Cancel all existing RPC responder tasks using the cancel scope @@ -1154,7 +1185,7 @@ class Actor: log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for ( (chan, cid), - (scope, func, is_complete), + (ctx, func, is_complete), ) in tasks.copy().items(): if only_chan is not None: if only_chan != chan: @@ -1162,7 +1193,11 @@ class Actor: # TODO: this should really done in a nursery batch if func != self._cancel_task: - await self._cancel_task(cid, chan) + await self._cancel_task( + cid, + chan, + requesting_uid=requesting_uid, + ) log.cancel( f"Waiting for remaining rpc tasks to complete {tasks}") @@ -1248,8 +1283,8 @@ async def async_main( Actor runtime entrypoint; start the IPC channel server, maybe connect back to the parent, and startup all core machinery tasks. - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. + A "root" (or "top-level") nursery for this actor is opened here and + when cancelled/terminated effectively closes the actor's "runtime". ''' # attempt to retreive ``trio``'s sigint handler and stash it @@ -1446,15 +1481,16 @@ async def process_messages( ) -> bool: ''' - Process messages for the IPC transport channel async-RPC style. + This is the per-channel, low level RPC task scheduler loop. - Receive multiplexed RPC requests, spawn handler tasks and deliver - responses over or boxed errors back to the "caller" task. + Receive multiplexed RPC request messages from some remote process, + spawn handler tasks depending on request type and deliver responses + or boxed errors back to the remote caller (task). ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! - msg = None + msg: dict | None = None nursery_cancelled_before_task: bool = False log.runtime(f"Entering msg loop for {chan} from {chan.uid}") @@ -1476,7 +1512,10 @@ async def process_messages( for (channel, cid) in actor._rpc_tasks.copy(): if channel is chan: - await actor._cancel_task(cid, channel) + await actor._cancel_task( + cid, + channel, + ) log.runtime( f"Msg loop signalled to terminate for" @@ -1490,12 +1529,14 @@ async def process_messages( cid = msg.get('cid') if cid: # deliver response to local caller/waiter + # via its per-remote-context memory channel. await actor._push_result(chan, cid, msg) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") continue + # TODO: implement with ``match:`` syntax? # process command request try: ns, funcname, kwargs, actorid, cid = msg['cmd'] @@ -1515,13 +1556,12 @@ async def process_messages( f"{ns}.{funcname}({kwargs})") if ns == 'self': - func = getattr(actor, funcname) - if funcname == 'cancel': + func = actor.cancel + kwargs['requesting_uid'] = chan.uid - # don't start entire actor runtime - # cancellation if this actor is in debug - # mode + # don't start entire actor runtime cancellation + # if this actor is currently in debug mode! pdb_complete = _debug.Lock.local_pdb_complete if pdb_complete: await pdb_complete.wait() @@ -1533,43 +1573,56 @@ async def process_messages( # msg loop and break out into # ``async_main()`` log.cancel( - f"Actor {actor.uid} was remotely cancelled " + "Actor runtime for was remotely cancelled " f"by {chan.uid}" ) await _invoke( - actor, cid, chan, func, kwargs, is_rpc=False + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, ) + log.cancel( + f'Cancelling msg loop for {chan.uid}' + ) loop_cs.cancel() break if funcname == '_cancel_task': + func = actor._cancel_task # we immediately start the runtime machinery # shutdown - with trio.CancelScope(shield=True): - # actor.cancel() was called so kill this - # msg loop and break out into - # ``async_main()`` - kwargs['chan'] = chan - log.cancel( - f'Remote request to cancel task\n' - f'remote actor: {chan.uid}\n' - f'task: {cid}' + # with trio.CancelScope(shield=True): + kwargs['chan'] = chan + target_cid = kwargs['cid'] + kwargs['requesting_uid'] = chan.uid + log.cancel( + f'Remote request to cancel task\n' + f'remote actor: {chan.uid}\n' + f'task: {target_cid}' + ) + try: + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, ) - try: - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception("failed to cancel task?") + except BaseException: + log.exception("failed to cancel task?") + + continue + else: + # normally registry methods, eg. + # ``.register_actor()`` etc. + func = getattr(actor, funcname) - continue else: # complain to client about restricted modules try: @@ -1584,34 +1637,49 @@ async def process_messages( log.runtime(f"Spawning task for {func}") assert actor._service_n try: - cs = await actor._service_n.start( - partial(_invoke, actor, cid, chan, func, kwargs), + ctx: Context = await actor._service_n.start( + partial( + _invoke, + actor, + cid, + chan, + func, + kwargs, + ), name=funcname, ) + except ( RuntimeError, BaseExceptionGroup, ): # avoid reporting a benign race condition # during actor runtime teardown. - nursery_cancelled_before_task = True + nursery_cancelled_before_task: bool = True break - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - # if func != actor.cancel: - if isinstance(cs, Exception): + # in the lone case where a ``Context`` is not + # delivered, it's likely going to be a locally + # scoped exception from ``_invoke()`` itself. + if isinstance(ctx, Exception): log.warning( f"Task for RPC func {func} failed with" - f"{cs}") + f"{ctx}" + ) + continue + else: # mark that we have ongoing rpc tasks actor._ongoing_rpc_tasks = trio.Event() log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be # cancelled gracefully if requested actor._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) + ctx, + func, + trio.Event(), + ) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") @@ -1655,7 +1723,7 @@ async def process_messages( match err: case ContextCancelled(): log.cancel( - f'Actor: {actor.uid} was task-context-cancelled with,\n' + f'Actor: {actor.uid} was context-cancelled with,\n' f'str(err)' ) case _: @@ -1672,7 +1740,8 @@ async def process_messages( # msg debugging for when he machinery is brokey log.runtime( f"Exiting msg loop for {chan} from {chan.uid} " - f"with last msg:\n{msg}") + f"with last msg:\n{msg}" + ) # transport **was not** disconnected return False diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9908ab6..450c712 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -21,24 +21,41 @@ Message stream types and APIs. from __future__ import annotations import inspect from contextlib import asynccontextmanager -from dataclasses import dataclass +from collections import deque +from dataclasses import ( + dataclass, + field, +) +from functools import partial +from pprint import pformat from typing import ( Any, Optional, Callable, AsyncGenerator, - AsyncIterator + AsyncIterator, + TYPE_CHECKING, ) - import warnings import trio from ._ipc import Channel -from ._exceptions import unpack_error, ContextCancelled -from ._state import current_actor +from ._exceptions import ( + unpack_error, + pack_error, + ContextCancelled, + StreamOverrun, +) from .log import get_logger -from .trionics import broadcast_receiver, BroadcastReceiver +from ._state import current_actor +from .trionics import ( + broadcast_receiver, + BroadcastReceiver, +) + +if TYPE_CHECKING: + from ._portal import Portal log = get_logger(__name__) @@ -335,8 +352,8 @@ class MsgStream(trio.abc.Channel): Send a message over this stream to the far end. ''' - if self._ctx._error: - raise self._ctx._error # from None + if self._ctx._remote_error: + raise self._ctx._remote_error # from None if self._closed: raise trio.ClosedResourceError('This stream was already closed') @@ -373,25 +390,61 @@ class Context: _recv_chan: trio.MemoryReceiveChannel _send_chan: trio.MemorySendChannel - _remote_func_type: Optional[str] = None + _remote_func_type: str | None = None # only set on the caller side - _portal: Optional['Portal'] = None # type: ignore # noqa - _result: Optional[Any] = False - _error: Optional[BaseException] = None + _portal: Portal | None = None # type: ignore # noqa + _result: Any | int = None + _remote_error: BaseException | None = None - # status flags + # cancellation state _cancel_called: bool = False - _cancel_msg: Optional[str] = None + _cancel_called_remote: tuple | None = None + _cancel_msg: str | None = None + _scope: trio.CancelScope | None = None _enter_debugger_on_cancel: bool = True + + @property + def cancel_called(self) -> bool: + ''' + Records whether cancellation has been requested for this context + by either an explicit call to ``.cancel()`` or an implicit call + due to an error caught inside the ``Portal.open_context()`` + block. + + ''' + return self._cancel_called + + @property + def cancel_called_remote(self) -> tuple[str, str] | None: + ''' + ``Actor.uid`` of the remote actor who's task was cancelled + causing this side of the context to also be cancelled. + + ''' + remote_uid = self._cancel_called_remote + if remote_uid: + return tuple(remote_uid) + + # init and streaming state _started_called: bool = False _started_received: bool = False _stream_opened: bool = False - # only set on the callee side - _scope_nursery: Optional[trio.Nursery] = None - - _backpressure: bool = True + # overrun handling machinery + # NOTE: none of this provides "backpressure" to the remote + # task, only an ability to not lose messages when the local + # task is configured to NOT transmit ``StreamOverrun``s back + # to the other side. + _overflow_q: deque[dict] = field( + default_factory=partial( + deque, + maxlen=616, + ) + ) + _scope_nursery: trio.Nursery | None = None + _in_overrun: bool = False + _allow_overruns: bool = False async def send_yield( self, @@ -410,9 +463,9 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - async def _maybe_raise_from_remote_msg( + async def _maybe_cancel_and_set_remote_error( self, - msg: dict[str, Any], + error_msg: dict[str, Any], ) -> None: ''' @@ -423,55 +476,77 @@ class Context: in the corresponding remote callee task. ''' - error = msg.get('error') - if error: - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - log.error( - f'Remote context error for {self.chan.uid}:{self.cid}:\n' - f'{msg["error"]["tb_str"]}' + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + error = unpack_error( + error_msg, + self.chan, + ) + + # XXX: set the remote side's error so that after we cancel + # whatever task is the opener of this context it can raise + # that error as the reason. + self._remote_error = error + + if ( + isinstance(error, ContextCancelled) + ): + log.cancel( + 'Remote task-context sucessfully cancelled for ' + f'{self.chan.uid}:{self.cid}' ) - error = unpack_error(msg, self.chan) - if ( - isinstance(error, ContextCancelled) and - self._cancel_called - ): + + if self._cancel_called: # this is an expected cancel request response message # and we don't need to raise it in scope since it will # potentially override a real error return + else: + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{error_msg["error"]["tb_str"]}' + ) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + # YES! this is way better and simpler! + if ( + self._scope + ): + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + self._cancel_called_remote = self.chan.uid + self._scope.cancel() - self._error = error + # NOTE: this usage actually works here B) + # from ._debug import breakpoint + # await breakpoint() - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - if self._scope_nursery: - async def raiser(): - raise self._error from None - - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - if not self._scope_nursery._closed: # type: ignore - self._scope_nursery.start_soon(raiser) + # XXX: this will break early callee results sending + # since when `.result()` is finally called, this + # chan will be closed.. + # if self._recv_chan: + # await self._recv_chan.aclose() async def cancel( self, msg: str | None = None, + timeout: float = 0.5, + # timeout: float = 1000, ) -> None: ''' @@ -488,6 +563,8 @@ class Context: log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') self._cancel_called = True + # await _debug.breakpoint() + # breakpoint() if side == 'caller': if not self._portal: @@ -496,8 +573,8 @@ class Context: ) cid = self.cid - with trio.move_on_after(0.5) as cs: - cs.shield = True + with trio.move_on_after(timeout) as cs: + # cs.shield = True log.cancel( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") @@ -505,7 +582,12 @@ class Context: # NOTE: we're telling the far end actor to cancel a task # corresponding to *this actor*. The far end local channel # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + await self._portal.run_from_ns( + 'self', + '_cancel_task', + cid=cid, + ) + # print("EXITING CANCEL CALL") if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -530,17 +612,14 @@ class Context: # {'error': trio.Cancelled, cid: "blah"} enough? # This probably gets into the discussion in # https://github.com/goodboy/tractor/issues/36 - assert self._scope_nursery - self._scope_nursery.cancel_scope.cancel() - - if self._recv_chan: - await self._recv_chan.aclose() + assert self._scope + self._scope.cancel() @asynccontextmanager async def open_stream( self, - backpressure: bool | None = True, + allow_overruns: bool | None = False, msg_buffer_size: int | None = None, ) -> AsyncGenerator[MsgStream, None]: @@ -592,8 +671,9 @@ class Context: self.chan, self.cid, msg_buffer_size=msg_buffer_size, + allow_overruns=allow_overruns, ) - ctx._backpressure = backpressure + ctx._allow_overruns = allow_overruns assert ctx is self # XXX: If the underlying channel feeder receive mem chan has @@ -637,48 +717,115 @@ class Context: f'ctx id: {self.cid}' ) - async def result(self) -> Any: + def _maybe_raise_remote_err( + self, + err: Exception, + ) -> None: + # NOTE: whenever the context's "opener" side (task) **is** + # the side which requested the cancellation (likekly via + # ``Context.cancel()``), we don't want to re-raise that + # cancellation signal locally (would be akin to + # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` + # whenever ``CancelScope.cancel()`` was called) and instead + # silently reap the expected cancellation "error"-msg. + # if 'pikerd' in err.msgdata['tb_str']: + # # from . import _debug + # # await _debug.breakpoint() + # breakpoint() + + if ( + isinstance(err, ContextCancelled) + and ( + self._cancel_called + or self.chan._cancel_called + or tuple(err.canceller) == current_actor().uid + ) + ): + return err + + raise err from None + + async def result(self) -> Any | Exception: ''' - From a caller side, wait for and return the final result from - the callee side task. + From some (caller) side task, wait for and return the final + result from the remote (callee) side's task. + + This provides a mechanism for one task running in some actor to wait + on another task at the other side, in some other actor, to terminate. + + If the remote task is still in a streaming state (it is delivering + values from inside a ``Context.open_stream():`` block, then those + msgs are drained but discarded since it is presumed this side of + the context has already finished with its own streaming logic. + + If the remote context (or its containing actor runtime) was + canceled, either by a local task calling one of + ``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the + received ``ContextCancelled`` exception if the context or + underlying IPC channel is marked as having been "cancel called". + This is similar behavior to using ``trio.Nursery.cancel()`` + wherein tasks which raise ``trio.Cancel`` are silently reaped; + the main different in this API is in the "cancel called" case, + instead of just not raising, we also return the exception *as + the result* since client code may be interested in the details + of the remote cancellation. ''' assert self._portal, "Context.result() can not be called from callee!" assert self._recv_chan - if self._result is False: + # from . import _debug + # await _debug.breakpoint() - if not self._recv_chan._closed: # type: ignore + re = self._remote_error + if re: + self._maybe_raise_remote_err(re) + return re - # wait for a final context result consuming - # and discarding any bi dir stream msgs still - # in transit from the far end. - while True: + if ( + self._result == id(self) + and not self._remote_error + and not self._recv_chan._closed # type: ignore + ): + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] - msg = await self._recv_chan.receive() - try: - self._result = msg['return'] - break - except KeyError as msgerr: + # NOTE: we don't need to do this right? + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if self._recv_chan: + # await self._recv_chan.aclose() - if 'yield' in msg: - # far end task is still streaming to us so discard - log.warning(f'Discarding stream delivered {msg}') - continue + break + except KeyError: # as msgerr: - elif 'stop' in msg: - log.debug('Remote stream terminated') - continue + if 'yield' in msg: + # far end task is still streaming to us so discard + log.warning(f'Discarding stream delivered {msg}') + continue - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue - raise unpack_error( - msg, self._portal.channel - ) from msgerr + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") - return self._result + err = unpack_error( + msg, + self._portal.channel + ) # from msgerr + + err = self._maybe_raise_remote_err(err) + self._remote_err = err + + return self._remote_error or self._result async def started( self, @@ -708,6 +855,187 @@ class Context: # async def restart(self) -> None: # pass + async def _drain_overflows( + self, + ) -> None: + ''' + Private task spawned to push newly received msgs to the local + task which getting overrun by the remote side. + + In order to not block the rpc msg loop, but also not discard + msgs received in this context, we need to async push msgs in + a new task which only runs for as long as the local task is in + an overrun state. + + ''' + self._in_overrun = True + try: + while self._overflow_q: + # NOTE: these msgs should never be errors since we always do + # the check prior to checking if we're in an overrun state + # inside ``.deliver_msg()``. + msg = self._overflow_q.popleft() + try: + await self._send_chan.send(msg) + except trio.BrokenResourceError: + log.warning( + f"{self._send_chan} consumer is already closed" + ) + return + except trio.Cancelled: + # we are obviously still in overrun + # but the context is being closed anyway + # so we just warn that there are un received + # msgs still.. + self._overflow_q.appendleft(msg) + fmt_msgs = '' + for msg in self._overflow_q: + fmt_msgs += f'{pformat(msg)}\n' + + log.warning( + f'Context for {self.cid} is being closed while ' + 'in an overrun state!\n' + 'Discarding the following msgs:\n' + f'{fmt_msgs}\n' + ) + raise + + finally: + # task is now finished with the backlog so mark us as + # no longer in backlog. + self._in_overrun = False + + async def _deliver_msg( + self, + msg: dict, + + draining: bool = False, + + ) -> bool: + + cid = self.cid + chan = self.chan + uid = chan.uid + send_chan: trio.MemorySendChannel = self._send_chan + + log.runtime( + f"Delivering {msg} from {uid} to caller {cid}" + ) + + error = msg.get('error') + if error: + await self._maybe_cancel_and_set_remote_error(msg) + + if ( + self._in_overrun + ): + self._overflow_q.append(msg) + return False + + try: + send_chan.send_nowait(msg) + return True + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + + except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") + return False + + # NOTE XXX: by default we do **not** maintain context-stream + # backpressure and instead opt to relay stream overrun errors to + # the sender; the main motivation is that using bp can block the + # msg handling loop which calls into this method! + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + # await self._maybe_cancel_and_set_remote_error(msg) + + local_uid = current_actor().uid + lines = [ + f'Actor-task context {cid}@{local_uid} was overrun by remote!', + f'sender actor: {uid}', + ] + if not self._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{local_uid[0]}` side! ***\n' + ) + + text = '\n'.join(lines) + + # XXX: lul, this really can't be backpressure since any + # blocking here will block the entire msg loop rpc sched for + # a whole channel.. maybe we should rename it? + if self._allow_overruns: + text += f'\nStarting overflow queuing task on msg: {msg}' + log.warning(text) + if ( + not self._in_overrun + ): + self._overflow_q.append(msg) + n = self._scope_nursery + if n.child_tasks: + from . import _debug + await _debug.breakpoint() + assert not n.child_tasks + n.start_soon( + self._drain_overflows, + ) + else: + try: + raise StreamOverrun(text) + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + try: + await chan.send(err_msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") + + return False + + +def mk_context( + chan: Channel, + cid: str, + msg_buffer_size: int = 2**6, + + **kwargs, + +) -> Context: + ''' + Internal factory to create an inter-actor task ``Context``. + + This is called by internals and should generally never be called + by user code. + + ''' + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel + send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) + + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + **kwargs, + ) + ctx._result = id(ctx) + return ctx + def stream(func: Callable) -> Callable: '''