diff --git a/tractor/_context.py b/tractor/_context.py index 4d56fb3..54e309e 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -43,12 +43,17 @@ import warnings import trio +# from .devx import ( +# maybe_wait_for_debugger, +# pause, +# ) from ._exceptions import ( # _raise_from_no_key_in_msg, unpack_error, pack_error, ContextCancelled, # MessagingError, + RemoteActorError, StreamOverrun, ) from .log import get_logger @@ -64,6 +69,164 @@ if TYPE_CHECKING: log = get_logger(__name__) +async def _drain_to_final_msg( + ctx: Context, +) -> list[dict]: + +# ) -> tuple[ +# Any|Exception, +# list[dict], +# ]: + raise_overrun: bool = not ctx._allow_overruns + + # wait for a final context result by collecting (but + # basically ignoring) any bi-dir-stream msgs still in transit + # from the far end. + pre_result_drained: list[dict] = [] + while not ctx._remote_error: + try: + # NOTE: this REPL usage actually works here dawg! Bo + # from .devx._debug import pause + # await pause() + # if re := ctx._remote_error: + # ctx._maybe_raise_remote_err( + # re, + # # NOTE: obvi we don't care if we + # # overran the far end if we're already + # # waiting on a final result (msg). + # raise_overrun_from_self=raise_overrun, + # ) + + # TODO: bad idea? + # with trio.CancelScope() as res_cs: + # ctx._res_scope = res_cs + # msg: dict = await ctx._recv_chan.receive() + # if res_cs.cancelled_caught: + + # from .devx._debug import pause + # await pause() + msg: dict = await ctx._recv_chan.receive() + ctx._result: Any = msg['return'] + log.runtime( + 'Context delivered final result msg:\n' + f'{pformat(msg)}' + ) + pre_result_drained.append(msg) + # NOTE: we don't need to do this right? + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if ctx._recv_chan: + # await ctx._recv_chan.aclose() + break + + # NOTE: we get here if the far end was + # `ContextCancelled` in 2 cases: + # 1. we requested the cancellation and thus + # SHOULD NOT raise that far end error, + # 2. WE DID NOT REQUEST that cancel and thus + # SHOULD RAISE HERE! + except trio.Cancelled: + + # CASE 2: mask the local cancelled-error(s) + # only when we are sure the remote error is + # the source cause of this local task's + # cancellation. + if re := ctx._remote_error: + ctx._maybe_raise_remote_err(re) + + # CASE 1: we DID request the cancel we simply + # continue to bubble up as normal. + raise + + except KeyError: + + if 'yield' in msg: + # far end task is still streaming to us so discard + log.warning(f'Discarding std "yield"\n{msg}') + pre_result_drained.append(msg) + continue + + # TODO: work out edge cases here where + # a stream is open but the task also calls + # this? + # -[ ] should be a runtime error if a stream is open + # right? + elif 'stop' in msg: + log.cancel( + 'Remote stream terminated due to "stop" msg:\n' + f'{msg}' + ) + pre_result_drained.append(msg) + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?" + ) + + # XXX fallthrough to handle expected error XXX + re: Exception|None = ctx._remote_error + if re: + log.critical( + 'Remote ctx terminated due to "error" msg:\n' + f'{re}' + ) + assert msg is ctx._cancel_msg + # NOTE: this solved a super dupe edge case XD + # this was THE super duper edge case of: + # - local task opens a remote task, + # - requests remote cancellation of far end + # ctx/tasks, + # - needs to wait for the cancel ack msg + # (ctxc) or some result in the race case + # where the other side's task returns + # before the cancel request msg is ever + # rxed and processed, + # - here this surrounding drain loop (which + # iterates all ipc msgs until the ack or + # an early result arrives) was NOT exiting + # since we are the edge case: local task + # does not re-raise any ctxc it receives + # IFF **it** was the cancellation + # requester.. + # will raise if necessary, ow break from + # loop presuming any error terminates the + # context! + ctx._maybe_raise_remote_err( + re, + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). + # raise_overrun_from_self=False, + raise_overrun_from_self=raise_overrun, + ) + + break # OOOOOF, yeah obvi we need this.. + + # XXX we should never really get here + # right! since `._deliver_msg()` should + # always have detected an {'error': ..} + # msg and already called this right!?! + elif error := unpack_error( + msg=msg, + chan=ctx._portal.channel, + hide_tb=False, + ): + log.critical('SHOULD NEVER GET HERE!?') + assert msg is ctx._cancel_msg + assert error.msgdata == ctx._remote_error.msgdata + from .devx._debug import pause + await pause() + ctx._maybe_cancel_and_set_remote_error(error) + ctx._maybe_raise_remote_err(error) + + else: + # bubble the original src key error + raise + + return pre_result_drained + + # TODO: make this a msgspec.Struct! @dataclass class Context: @@ -118,6 +281,7 @@ class Context: # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. _scope: trio.CancelScope | None = None + # _res_scope: trio.CancelScope|None = None # on a clean exit there should be a final value # delivered from the far end "callee" task, so @@ -205,6 +369,10 @@ class Context: ) ) + # @property + # def is_waiting_result(self) -> bool: + # return bool(self._res_scope) + @property def side(self) -> str: ''' @@ -247,7 +415,11 @@ class Context: await self.chan.send({'yield': data, 'cid': self.cid}) async def send_stop(self) -> None: - await self.chan.send({'stop': True, 'cid': self.cid}) + # await pause() + await self.chan.send({ + 'stop': True, + 'cid': self.cid + }) def _maybe_cancel_and_set_remote_error( self, @@ -320,27 +492,37 @@ class Context: # XXX: set the remote side's error so that after we cancel # whatever task is the opener of this context it can raise # that error as the reason. + # if self._remote_error: + # return + + # breakpoint() + log.cancel( + 'Setting remote error for ctx \n' + f'<= remote ctx uid: {self.chan.uid}\n' + f'=>\n{error}' + ) self._remote_error: BaseException = error if ( isinstance(error, ContextCancelled) ): - # always record the cancelling actor's uid since its cancellation - # state is linked and we want to know which process was - # the cause / requester of the cancellation. - self._canceller = error.canceller - log.cancel( 'Remote task-context was cancelled for ' f'actor: {self.chan.uid}\n' f'task: {self.cid}\n' f'canceller: {error.canceller}\n' ) + # always record the cancelling actor's uid since its cancellation + # state is linked and we want to know which process was + # the cause / requester of the cancellation. + # if error.canceller is None: + # import pdbp; pdbp.set_trace() + + # breakpoint() + self._canceller = error.canceller + if self._cancel_called: - # from .devx._debug import breakpoint - # await breakpoint() - # this is an expected cancel request response message # and we **don't need to raise it** in local cancel # scope since it will potentially override a real error. @@ -348,10 +530,11 @@ class Context: else: log.error( - f'Remote context error,\n' - f'remote actor: {self.chan.uid}\n' - f'task: {self.cid}\n' - f'{error}' + f'Remote context error:\n' + f'{error}\n' + f'{pformat(self)}\n' + # f'remote actor: {self.chan.uid}\n' + # f'cid: {self.cid}\n' ) self._canceller = self.chan.uid @@ -376,9 +559,11 @@ class Context: self._scope.cancel() # NOTE: this REPL usage actually works here dawg! Bo - # from .devx._debug import pause # await pause() + # TODO: maybe we have to use `._res_scope.cancel()` if it + # exists? + async def cancel( self, timeout: float = 0.616, @@ -395,6 +580,8 @@ class Context: log.cancel( f'Cancelling {side} side of context to {self.chan.uid}' ) + + # await pause() self._cancel_called: bool = True # caller side who entered `Portal.open_context()` @@ -484,13 +671,11 @@ class Context: ''' actor: Actor = current_actor() - # here we create a mem chan that corresponds to the - # far end caller / callee. - - # Likewise if the surrounding context has been cancelled we error here - # since it likely means the surrounding block was exited or - # killed - + # If the surrounding context has been cancelled by some + # task with a handle to THIS, we error here immediately + # since it likely means the surrounding lexical-scope has + # errored, been `trio.Cancelled` or at the least + # `Context.cancel()` was called by some task. if self._cancel_called: # XXX NOTE: ALWAYS RAISE any remote error here even if @@ -503,6 +688,11 @@ class Context: # actually try to stream - a cancel msg was already # sent to the other side! if self._remote_error: + # NOTE: this is diff then calling + # `._maybe_raise_from_remote_msg()` specifically + # because any task entering this `.open_stream()` + # AFTER cancellation has already been requested, + # we DO NOT want to absorb any ctxc ACK silently! raise self._remote_error # XXX NOTE: if no `ContextCancelled` has been responded @@ -529,7 +719,7 @@ class Context: # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. - ctx = actor.get_context( + ctx: Context = actor.get_context( self.chan, self.cid, msg_buffer_size=msg_buffer_size, @@ -548,6 +738,19 @@ class Context: 'The underlying channel for this stream was already closed!?' ) + # NOTE: implicitly this will call `MsgStream.aclose()` on + # `.__aexit__()` due to stream's parent `Channel` type! + # + # XXX NOTE XXX: ensures the stream is "one-shot use", + # which specifically means that on exit, + # - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to + # the far end indicating that the caller exited + # the streaming context purposefully by letting + # the exit block exec. + # - this is diff from the cancel/error case where + # a cancel request from this side or an error + # should be sent to the far end indicating the + # stream WAS NOT just closed normally/gracefully. async with MsgStream( ctx=self, rx_chan=ctx._recv_chan, @@ -567,11 +770,37 @@ class Context: # await trio.lowlevel.checkpoint() yield stream - # NOTE: Make the stream "one-shot use". On exit, - # signal - # ``trio.EndOfChannel``/``StopAsyncIteration`` to - # the far end. - await stream.aclose() + + # XXX: (MEGA IMPORTANT) if this is a root opened process we + # wait for any immediate child in debug before popping the + # context from the runtime msg loop otherwise inside + # ``Actor._push_result()`` the msg will be discarded and in + # the case where that msg is global debugger unlock (via + # a "stop" msg for a stream), this can result in a deadlock + # where the root is waiting on the lock to clear but the + # child has already cleared it and clobbered IPC. + # + # await maybe_wait_for_debugger() + + # XXX TODO: pretty sure this isn't needed (see + # note above this block) AND will result in + # a double `.send_stop()` call. The only reason to + # put it here would be to due with "order" in + # terms of raising any remote error (as per + # directly below) or bc the stream's + # `.__aexit__()` block might not get run + # (doubtful)? Either way if we did put this back + # in we also need a state var to avoid the double + # stop-msg send.. + # + # await stream.aclose() + + # if re := ctx._remote_error: + # ctx._maybe_raise_remote_err( + # re, + # raise_ctxc_from_self_call=True, + # ) + # await trio.lowlevel.checkpoint() finally: if self._portal: @@ -587,7 +816,10 @@ class Context: def _maybe_raise_remote_err( self, err: Exception, - ) -> None: + raise_ctxc_from_self_call: bool = False, + raise_overrun_from_self: bool = True, + + ) -> ContextCancelled|None: ''' Maybe raise a remote error depending on who (which task from which actor) requested a cancellation (if any). @@ -603,13 +835,21 @@ class Context: # "error"-msg. our_uid: tuple[str, str] = current_actor().uid if ( - isinstance(err, ContextCancelled) - and ( + (not raise_ctxc_from_self_call + and isinstance(err, ContextCancelled) + and ( self._cancel_called or self.chan._cancel_called or self.canceller == our_uid - or tuple(err.canceller) == our_uid + or tuple(err.canceller) == our_uid) ) + or + (not raise_overrun_from_self + and isinstance(err, RemoteActorError) + and err.msgdata['type_str'] == 'StreamOverrun' + and tuple(err.msgdata['sender']) == our_uid + ) + ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" @@ -661,77 +901,196 @@ class Context: assert self._portal, "Context.result() can not be called from callee!" assert self._recv_chan - if re := self._remote_error: - return self._maybe_raise_remote_err(re) + raise_overrun: bool = not self._allow_overruns + # if re := self._remote_error: + # return self._maybe_raise_remote_err( + # re, + # # NOTE: obvi we don't care if we + # # overran the far end if we're already + # # waiting on a final result (msg). + # raise_overrun_from_self=raise_overrun, + # ) + res_placeholder: int = id(self) if ( - self._result == id(self) + self._result == res_placeholder and not self._remote_error and not self._recv_chan._closed # type: ignore ): - # wait for a final context result consuming - # and discarding any bi dir stream msgs still - # in transit from the far end. - while True: - try: - msg = await self._recv_chan.receive() - self._result: Any = msg['return'] - # NOTE: we don't need to do this right? - # XXX: only close the rx mem chan AFTER - # a final result is retreived. - # if self._recv_chan: - # await self._recv_chan.aclose() + # wait for a final context result by collecting (but + # basically ignoring) any bi-dir-stream msgs still in transit + # from the far end. + drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self) + log.runtime( + 'Ctx drained pre-result msgs:\n' + f'{drained_msgs}' + ) - break + # TODO: implement via helper func ^^^^ + # pre_result_drained: list[dict] = [] + # while not self._remote_error: + # try: + # # NOTE: this REPL usage actually works here dawg! Bo + # # from .devx._debug import pause + # # await pause() + # # if re := self._remote_error: + # # self._maybe_raise_remote_err( + # # re, + # # # NOTE: obvi we don't care if we + # # # overran the far end if we're already + # # # waiting on a final result (msg). + # # raise_overrun_from_self=raise_overrun, + # # ) - # NOTE: we get here if the far end was - # `ContextCancelled` in 2 cases: - # 1. we requested the cancellation and thus - # SHOULD NOT raise that far end error, - # 2. WE DID NOT REQUEST that cancel and thus - # SHOULD RAISE HERE! - except trio.Cancelled: + # # TODO: bad idea? + # # with trio.CancelScope() as res_cs: + # # self._res_scope = res_cs + # # msg: dict = await self._recv_chan.receive() + # # if res_cs.cancelled_caught: - # CASE 2: mask the local cancelled-error(s) - # only when we are sure the remote error is the - # (likely) source cause of this local runtime - # task's cancellation. - if re := self._remote_error: - self._maybe_raise_remote_err(re) + # # from .devx._debug import pause + # # await pause() + # msg: dict = await self._recv_chan.receive() + # self._result: Any = msg['return'] + # log.runtime( + # 'Context delivered final result msg:\n' + # f'{pformat(msg)}' + # ) + # # NOTE: we don't need to do this right? + # # XXX: only close the rx mem chan AFTER + # # a final result is retreived. + # # if self._recv_chan: + # # await self._recv_chan.aclose() + # break - # CASE 1: we DID request the cancel we simply - # continue to bubble up as normal. - raise + # # NOTE: we get here if the far end was + # # `ContextCancelled` in 2 cases: + # # 1. we requested the cancellation and thus + # # SHOULD NOT raise that far end error, + # # 2. WE DID NOT REQUEST that cancel and thus + # # SHOULD RAISE HERE! + # except trio.Cancelled: - except KeyError: # as msgerr: + # # CASE 2: mask the local cancelled-error(s) + # # only when we are sure the remote error is + # # the source cause of this local task's + # # cancellation. + # if re := self._remote_error: + # self._maybe_raise_remote_err(re) - if 'yield' in msg: - # far end task is still streaming to us so discard - log.warning(f'Discarding stream delivered {msg}') - continue + # # CASE 1: we DID request the cancel we simply + # # continue to bubble up as normal. + # raise - elif 'stop' in msg: - log.debug('Remote stream terminated') - continue + # except KeyError: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?" - ) + # if 'yield' in msg: + # # far end task is still streaming to us so discard + # log.warning(f'Discarding std "yield"\n{msg}') + # pre_result_drained.append(msg) + # continue - if err:= unpack_error( - msg, - self._portal.channel - ): # from msgerr - self._maybe_cancel_and_set_remote_error(err) - self._maybe_raise_remote_err(err) + # # TODO: work out edge cases here where + # # a stream is open but the task also calls + # # this? + # # -[ ] should be a runtime error if a stream is open + # # right? + # elif 'stop' in msg: + # log.cancel( + # 'Remote stream terminated due to "stop" msg:\n' + # f'{msg}' + # ) + # pre_result_drained.append(msg) + # continue - else: - raise + # # internal error should never get here + # assert msg.get('cid'), ( + # "Received internal error at portal?" + # ) - if re := self._remote_error: - return self._maybe_raise_remote_err(re) + # # XXX fallthrough to handle expected error XXX + # re: Exception|None = self._remote_error + # if re: + # log.critical( + # 'Remote ctx terminated due to "error" msg:\n' + # f'{re}' + # ) + # assert msg is self._cancel_msg + # # NOTE: this solved a super dupe edge case XD + # # this was THE super duper edge case of: + # # - local task opens a remote task, + # # - requests remote cancellation of far end + # # ctx/tasks, + # # - needs to wait for the cancel ack msg + # # (ctxc) or some result in the race case + # # where the other side's task returns + # # before the cancel request msg is ever + # # rxed and processed, + # # - here this surrounding drain loop (which + # # iterates all ipc msgs until the ack or + # # an early result arrives) was NOT exiting + # # since we are the edge case: local task + # # does not re-raise any ctxc it receives + # # IFF **it** was the cancellation + # # requester.. + # # will raise if necessary, ow break from + # # loop presuming any error terminates the + # # context! + # self._maybe_raise_remote_err( + # re, + # # NOTE: obvi we don't care if we + # # overran the far end if we're already + # # waiting on a final result (msg). + # # raise_overrun_from_self=False, + # raise_overrun_from_self=raise_overrun, + # ) + + # break # OOOOOF, yeah obvi we need this.. + + # # XXX we should never really get here + # # right! since `._deliver_msg()` should + # # always have detected an {'error': ..} + # # msg and already called this right!?! + # elif error := unpack_error( + # msg=msg, + # chan=self._portal.channel, + # hide_tb=False, + # ): + # log.critical('SHOULD NEVER GET HERE!?') + # assert msg is self._cancel_msg + # assert error.msgdata == self._remote_error.msgdata + # from .devx._debug import pause + # await pause() + # self._maybe_cancel_and_set_remote_error(error) + # self._maybe_raise_remote_err(error) + + # else: + # # bubble the original src key error + # raise + + if ( + (re := self._remote_error) + and self._result == res_placeholder + ): + maybe_err: Exception|None = self._maybe_raise_remote_err( + re, + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). + # raise_overrun_from_self=False, + raise_overrun_from_self=( + raise_overrun + and + # only when we ARE NOT the canceller + # should we raise overruns, bc ow we're + # raising something we know might happen + # during cancellation ;) + (not self._cancel_called) + ), + ) + if maybe_err: + self._result = maybe_err return self._result @@ -779,7 +1138,7 @@ class Context: while self._overflow_q: # NOTE: these msgs should never be errors since we always do # the check prior to checking if we're in an overrun state - # inside ``.deliver_msg()``. + # inside ``._deliver_msg()``. msg = self._overflow_q.popleft() try: await self._send_chan.send(msg) @@ -830,34 +1189,50 @@ class Context: messages are eventually sent if possible. ''' - cid = self.cid - chan = self.chan - uid = chan.uid + cid: str = self.cid + chan: Channel = self.chan + from_uid: tuple[str, str] = chan.uid send_chan: trio.MemorySendChannel = self._send_chan - log.runtime( - f"Delivering {msg} from {uid} to caller {cid}" - ) - - if ( - msg.get('error') # check for field - and ( - error := unpack_error( - msg, - self.chan, - ) + if re := unpack_error( + msg, + self.chan, + ): + log.error( + f'Delivering error-msg from {from_uid} to caller {cid}' + f'{re}' ) - ): self._cancel_msg = msg - self._maybe_cancel_and_set_remote_error(error) + self._maybe_cancel_and_set_remote_error(re) - if ( - self._in_overrun - ): + # XXX NEVER do this XXX..!! + # bc if the error is a ctxc and there is a task + # waiting on `.result()` we need the msg to be sent + # over the `send_chan`/`._recv_chan` so that the error + # is relayed to that waiter task.. + # return True + # + # XXX ALSO NO!! XXX + # if self._remote_error: + # self._maybe_raise_remote_err(error) + + if self._in_overrun: + log.warning( + f'Capturing overrun-msg from {from_uid} to caller {cid}' + f'{msg}' + ) self._overflow_q.append(msg) return False try: + log.runtime( + f'Delivering IPC `Context` msg:\n' + f'<= {from_uid}\n' + f'=> caller: {cid}\n' + f'{msg}' + ) + # from .devx._debug import pause + # await pause() send_chan.send_nowait(msg) return True # if an error is deteced we should always @@ -890,7 +1265,8 @@ class Context: lines = [ f'OVERRUN on actor-task context {cid}@{local_uid}!\n' # TODO: put remote task name here if possible? - f'remote sender actor: {uid}', + f'sender: {from_uid}', + f'msg: {msg}', # TODO: put task func name here and maybe an arrow # from sender to overrunner? # f'local task {self.func_name}' @@ -926,11 +1302,19 @@ class Context: # anything different. return False else: + # raise local overrun and immediately pack as IPC + # msg for far end. try: - raise StreamOverrun(text) + raise StreamOverrun( + text, + sender=from_uid, + ) except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid + err_msg: dict[str, dict] = pack_error( + err, + cid=cid, + ) + # err_msg['cid']: str = cid try: await chan.send(err_msg) except trio.BrokenResourceError: diff --git a/tractor/_portal.py b/tractor/_portal.py index 378f6a2..14f6fbf 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -39,7 +39,15 @@ import trio from async_generator import asynccontextmanager from .trionics import maybe_open_nursery -from ._state import current_actor +from .devx import ( + # acquire_debug_lock, + # pause, + maybe_wait_for_debugger, +) +from ._state import ( + current_actor, + debug_mode, +) from ._ipc import Channel from .log import get_logger from .msg import NamespacePath @@ -48,6 +56,7 @@ from ._exceptions import ( unpack_error, NoResult, ContextCancelled, + RemoteActorError, ) from ._context import ( Context, @@ -55,7 +64,6 @@ from ._context import ( from ._streaming import ( MsgStream, ) -from .devx._debug import maybe_wait_for_debugger log = get_logger(__name__) @@ -469,7 +477,6 @@ class Portal: ctx._started_called: bool = True except KeyError as src_error: - _raise_from_no_key_in_msg( ctx=ctx, msg=msg, @@ -494,6 +501,33 @@ class Portal: # in enter tuple. yield ctx, first + # between the caller exiting and arriving here the + # far end may have sent a ctxc-msg or other error, + # so check for it here immediately and maybe raise + # so as to engage the ctxc handling block below! + # if re := ctx._remote_error: + # maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( + # re, + + # # TODO: do we want this to always raise? + # # - means that on self-ctxc, if/when the + # # block is exited before the msg arrives + # # but then the msg during __exit__ + # # calling we may not activate the + # # ctxc-handler block below? should we + # # be? + # # - if there's a remote error that arrives + # # after the child has exited, we won't + # # handle until the `finally:` block + # # where `.result()` is always called, + # # again in which case we handle it + # # differently then in the handler block + # # that would normally engage from THIS + # # block? + # raise_ctxc_from_self_call=True, + # ) + # assert maybe_ctxc + # when in allow_overruns mode there may be # lingering overflow sender tasks remaining? if nurse.child_tasks: @@ -539,7 +573,7 @@ class Portal: # `.canceller: tuple[str, str]` to be same value as # caught here in a `ContextCancelled.canceller`. # - # Again, there are 2 cases: + # AGAIN to restate the above, there are 2 cases: # # 1-some other context opened in this `.open_context()` # block cancelled due to a self or peer cancellation @@ -555,6 +589,16 @@ class Portal: except ContextCancelled as ctxc: scope_err = ctxc + # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! + # using this code and then resuming the REPL will + # cause a SIGINT-ignoring HANG! + # -> prolly due to a stale debug lock entry.. + # -[ ] USE `.stackscope` to demonstrate that (possibly + # documenting it as a definittive example of + # debugging the tractor-runtime itself using it's + # own `.devx.` tooling! + # await pause() + # CASE 2: context was cancelled by local task calling # `.cancel()`, we don't raise and the exit block should # exit silently. @@ -562,18 +606,23 @@ class Portal: ctx._cancel_called and ( ctxc is ctx._remote_error - or - ctxc.canceller is self.canceller + # ctxc.msgdata == ctx._remote_error.msgdata + + # TODO: uhh `Portal.canceller` ain't a thangg + # dawg? (was `self.canceller` before?!?) + and + ctxc.canceller == self.actor.uid ) ): - log.debug( - f'Context {ctx} cancelled gracefully with:\n' + log.cancel( + f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' f'{ctxc}' ) # CASE 1: this context was never cancelled via a local # task (tree) having called `Context.cancel()`, raise # the error since it was caused by someone else! else: + # await pause() raise # the above `._scope` can be cancelled due to: @@ -602,8 +651,8 @@ class Portal: trio.Cancelled, # NOTE: NOT from inside the ctx._scope KeyboardInterrupt, - ) as err: - scope_err = err + ) as caller_err: + scope_err = caller_err # XXX: ALWAYS request the context to CANCEL ON any ERROR. # NOTE: `Context.cancel()` is conversely NEVER CALLED in @@ -611,11 +660,26 @@ class Portal: # handled in the block above! log.cancel( 'Context cancelled for task due to\n' - f'{err}\n' + f'{caller_err}\n' 'Sending cancel request..\n' f'task:{cid}\n' f'actor:{uid}' ) + + if debug_mode(): + log.pdb( + 'Delaying `ctx.cancel()` until debug lock ' + 'acquired..' + ) + # async with acquire_debug_lock(self.actor.uid): + # pass + # TODO: factor ^ into below for non-root cases? + await maybe_wait_for_debugger() + log.pdb( + 'Acquired debug lock! ' + 'Calling `ctx.cancel()`!' + ) + try: await ctx.cancel() except trio.BrokenResourceError: @@ -629,6 +693,33 @@ class Portal: # no local scope error, the "clean exit with a result" case. else: + # between the caller exiting and arriving here the + # far end may have sent a ctxc-msg or other error, + # so check for it here immediately and maybe raise + # so as to engage the ctxc handling block below! + # if re := ctx._remote_error: + # maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( + # re, + + # # TODO: do we want this to always raise? + # # - means that on self-ctxc, if/when the + # # block is exited before the msg arrives + # # but then the msg during __exit__ + # # calling we may not activate the + # # ctxc-handler block below? should we + # # be? + # # - if there's a remote error that arrives + # # after the child has exited, we won't + # # handle until the `finally:` block + # # where `.result()` is always called, + # # again in which case we handle it + # # differently then in the handler block + # # that would normally engage from THIS + # # block? + # raise_ctxc_from_self_call=True, + # ) + # assert maybe_ctxc + if ctx.chan.connected(): log.info( 'Waiting on final context-task result for\n' @@ -645,13 +736,8 @@ class Portal: # As per `Context._deliver_msg()`, that error IS # ALWAYS SET any time "callee" side fails and causes "caller # side" cancellation via a `ContextCancelled` here. - # result = await ctx.result() try: - result = await ctx.result() - log.runtime( - f'Context {fn_name} returned value from callee:\n' - f'`{result}`' - ) + result_or_err: Exception|Any = await ctx.result() except BaseException as berr: # on normal teardown, if we get some error # raised in `Context.result()` we still want to @@ -663,7 +749,48 @@ class Portal: scope_err = berr raise + # an exception type boxed in a `RemoteActorError` + # is returned (meaning it was obvi not raised). + msgdata: str|None = getattr( + result_or_err, + 'msgdata', + None + ) + # yes! this worx Bp + # from .devx import _debug + # await _debug.pause() + match (msgdata, result_or_err): + case ( + {'tb_str': tbstr}, + ContextCancelled(), + ): + log.cancel(tbstr) + + case ( + {'tb_str': tbstr}, + RemoteActorError(), + ): + log.exception( + f'Context `{fn_name}` remotely errored:\n' + f'`{tbstr}`' + ) + case (None, _): + log.runtime( + f'Context {fn_name} returned value from callee:\n' + f'`{result_or_err}`' + ) + finally: + # XXX: (MEGA IMPORTANT) if this is a root opened process we + # wait for any immediate child in debug before popping the + # context from the runtime msg loop otherwise inside + # ``Actor._push_result()`` the msg will be discarded and in + # the case where that msg is global debugger unlock (via + # a "stop" msg for a stream), this can result in a deadlock + # where the root is waiting on the lock to clear but the + # child has already cleared it and clobbered IPC. + await maybe_wait_for_debugger() + # though it should be impossible for any tasks # operating *in* this scope to have survived # we tear down the runtime feeder chan last @@ -708,6 +835,10 @@ class Portal: # out any exception group or legit (remote) ctx # error that sourced from the remote task or its # runtime. + # + # NOTE: further, this should be the only place the + # underlying feeder channel is + # once-and-only-CLOSED! with trio.CancelScope(shield=True): await ctx._recv_chan.aclose() @@ -737,18 +868,11 @@ class Portal: f'actor:{uid}' ) - # XXX: (MEGA IMPORTANT) if this is a root opened process we - # wait for any immediate child in debug before popping the - # context from the runtime msg loop otherwise inside - # ``Actor._push_result()`` the msg will be discarded and in - # the case where that msg is global debugger unlock (via - # a "stop" msg for a stream), this can result in a deadlock - # where the root is waiting on the lock to clear but the - # child has already cleared it and clobbered IPC. - await maybe_wait_for_debugger() - # FINALLY, remove the context from runtime tracking and # exit! + log.runtime( + f'Exiting context opened with {ctx.chan.uid}' + ) self.actor._contexts.pop( (self.channel.uid, ctx.cid), None, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 4530e14..e8f735e 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()`` ''' from __future__ import annotations -import inspect from contextlib import asynccontextmanager as acm +import inspect +from pprint import pformat from typing import ( Any, Callable, @@ -35,6 +36,7 @@ import trio from ._exceptions import ( _raise_from_no_key_in_msg, + ContextCancelled, ) from .log import get_logger from .trionics import ( @@ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel): self._broadcaster = _broadcaster # flag to denote end of stream - self._eoc: bool = False - self._closed: bool = False + self._eoc: bool|trio.EndOfChannel = False + self._closed: bool|trio.ClosedResourceError = False # delegate directly to underlying mem channel def receive_nowait(self): @@ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel): try: return msg['yield'] except KeyError as kerr: + # if 'return' in msg: + # return msg + _raise_from_no_key_in_msg( ctx=self._ctx, msg=msg, @@ -122,30 +127,43 @@ class MsgStream(trio.abc.Channel): # see ``.aclose()`` for notes on the old behaviour prior to # introducing this if self._eoc: - raise trio.EndOfChannel + raise self._eoc + # raise trio.EndOfChannel if self._closed: - raise trio.ClosedResourceError('This stream was closed') + raise self._closed + # raise trio.ClosedResourceError( + # 'This stream was already closed' + # ) + src_err: Exception|None = None try: - msg = await self._rx_chan.receive() - return msg['yield'] + try: + msg = await self._rx_chan.receive() + return msg['yield'] - except KeyError as kerr: - _raise_from_no_key_in_msg( - ctx=self._ctx, - msg=msg, - src_err=kerr, - log=log, - expect_key='yield', - stream=self, - ) + except KeyError as kerr: + src_err = kerr + # NOTE: may raise any of the below error types + # includg EoC when a 'stop' msg is found. + _raise_from_no_key_in_msg( + ctx=self._ctx, + msg=msg, + src_err=kerr, + log=log, + expect_key='yield', + stream=self, + ) + + # XXX: we close the stream on any of these error conditions: except ( - trio.ClosedResourceError, # by self._rx_chan + # trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end - ): - # XXX: we close the stream on any of these error conditions: + ) as eoc: + src_err = eoc + self._eoc = eoc + # await trio.sleep(1) # a ``ClosedResourceError`` indicates that the internal # feeder memory receive channel was closed likely by the @@ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel): # closing this stream and not flushing a final value to # remaining (clone) consumers who may not have been # scheduled to receive it yet. + # try: + # maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait() + # if maybe_err_msg_or_res: + # log.warning( + # 'Discarding un-processed msg:\n' + # f'{maybe_err_msg_or_res}' + # ) + # except trio.WouldBlock: + # # no queued msgs that might be another remote + # # error, so just raise the original EoC + # pass - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() + # raise eoc - raise # propagate + except trio.ClosedResourceError as cre: # by self._rx_chan + src_err = cre + log.warning( + '`Context._rx_chan` was already closed?' + ) + self._closed = cre - async def aclose(self): + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + drained: list[Exception|dict] = await self.aclose() + if drained: + log.warning( + 'Drained context msgs during closure:\n' + f'{drained}' + ) + # TODO: pass these to the `._ctx._drained_msgs: deque` + # and then iterate them as part of any `.result()` call? + + # NOTE XXX: if the context was cancelled or remote-errored + # but we received the stream close msg first, we + # probably want to instead raise the remote error + # over the end-of-stream connection error since likely + # the remote error was the source cause? + ctx: Context = self._ctx + if re := ctx._remote_error: + ctx._maybe_raise_remote_err( + re, + raise_ctxc_from_self_call=True, + ) + + raise src_err # propagate + + async def aclose(self) -> list[Exception|dict]: ''' Cancel associated remote actor task and local memory channel on close. @@ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: - log.cancel(f"{self} is already closed") + if ( + rx_chan._closed + or + self._closed + ): + log.cancel( + f'`MsgStream` is already closed\n' + f'.cid: {self._ctx.cid}\n' + f'._rx_chan`: {rx_chan}\n' + f'._eoc: {self._eoc}\n' + f'._closed: {self._eoc}\n' + ) # this stream has already been closed so silently succeed as # per ``trio.AsyncResource`` semantics. # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose - return + return [] - self._eoc = True + ctx: Context = self._ctx + # caught_eoc: bool = False + drained: list[Exception|dict] = [] + while not drained: + try: + maybe_final_msg = self.receive_nowait() + if maybe_final_msg: + log.cancel( + 'Drained un-processed stream msg:\n' + f'{pformat(maybe_final_msg)}' + ) + # TODO: inject into parent `Context` buf? + drained.append(maybe_final_msg) + + except trio.WouldBlock as be: + drained.append(be) + break + + except trio.EndOfChannel as eoc: + drained.append(eoc) + # caught_eoc = True + self._eoc: bool = eoc + break + + except ContextCancelled as ctxc: + log.cancel( + 'Context was cancelled during stream closure:\n' + f'canceller: {ctxc.canceller}\n' + f'{pformat(ctxc.msgdata)}' + ) + break # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're @@ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel): except ( trio.BrokenResourceError, trio.ClosedResourceError - ): + ) as re: # the underlying channel may already have been pulled # in which case our stop message is meaningless since # it can't traverse the transport. - ctx = self._ctx log.warning( f'Stream was already destroyed?\n' f'actor: {ctx.chan.uid}\n' f'ctx id: {ctx.cid}' ) + drained.append(re) + self._closed = re - self._closed = True + # if caught_eoc: + # # from .devx import _debug + # # await _debug.pause() + # with trio.CancelScope(shield=True): + # await rx_chan.aclose() - # Do we close the local mem chan ``self._rx_chan`` ??!? + # self._eoc: bool = caught_eoc - # NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``! - # BECAUSE this same core-msg-loop mem recv-chan is used to deliver - # the potential final result from the surrounding inter-actor - # `Context` so we don't want to close it until that context has - # run to completion. + # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? + # => NO, DEFINITELY NOT! <= + # if we're a bi-dir ``MsgStream`` BECAUSE this same + # core-msg-loop mem recv-chan is used to deliver the + # potential final result from the surrounding inter-actor + # `Context` so we don't want to close it until that + # context has run to completion. # XXX: Notes on old behaviour: # await rx_chan.aclose() @@ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel): # runtime's closure of ``rx_chan`` in the case where we may # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). + # self._closed = True + return drained @acm async def subscribe( @@ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel): raise self._ctx._remote_error # from None if self._closed: - raise trio.ClosedResourceError('This stream was already closed') + raise self._closed + # raise trio.ClosedResourceError('This stream was already closed') - await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + await self._ctx.chan.send({ + 'yield': data, + 'cid': self._ctx.cid, + }) def stream(func: Callable) -> Callable: