forked from goodboy/tractor
				
			WIP: solved the modden client hang..
							parent
							
								
									4b0aa5e379
								
							
						
					
					
						commit
						ddc2e5f0f8
					
				|  | @ -43,12 +43,17 @@ import warnings | |||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| # from .devx import ( | ||||
| #     maybe_wait_for_debugger, | ||||
| #     pause, | ||||
| # ) | ||||
| from ._exceptions import ( | ||||
|     # _raise_from_no_key_in_msg, | ||||
|     unpack_error, | ||||
|     pack_error, | ||||
|     ContextCancelled, | ||||
|     # MessagingError, | ||||
|     RemoteActorError, | ||||
|     StreamOverrun, | ||||
| ) | ||||
| from .log import get_logger | ||||
|  | @ -64,6 +69,164 @@ if TYPE_CHECKING: | |||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| async def _drain_to_final_msg( | ||||
|     ctx: Context, | ||||
| ) -> list[dict]: | ||||
| 
 | ||||
| # ) -> tuple[ | ||||
| #     Any|Exception, | ||||
| #     list[dict], | ||||
| # ]: | ||||
|     raise_overrun: bool = not ctx._allow_overruns | ||||
| 
 | ||||
|     # wait for a final context result by collecting (but | ||||
|     # basically ignoring) any bi-dir-stream msgs still in transit | ||||
|     # from the far end. | ||||
|     pre_result_drained: list[dict] = [] | ||||
|     while not ctx._remote_error: | ||||
|         try: | ||||
|             # NOTE: this REPL usage actually works here dawg! Bo | ||||
|             # from .devx._debug import pause | ||||
|             # await pause() | ||||
|             # if re := ctx._remote_error: | ||||
|             #     ctx._maybe_raise_remote_err( | ||||
|             #         re, | ||||
|             #         # NOTE: obvi we don't care if we | ||||
|             #         # overran the far end if we're already | ||||
|             #         # waiting on a final result (msg). | ||||
|             #         raise_overrun_from_self=raise_overrun, | ||||
|             #     ) | ||||
| 
 | ||||
|             # TODO: bad idea? | ||||
|             # with trio.CancelScope() as res_cs: | ||||
|             #     ctx._res_scope = res_cs | ||||
|             #     msg: dict = await ctx._recv_chan.receive() | ||||
|             # if res_cs.cancelled_caught: | ||||
| 
 | ||||
|             # from .devx._debug import pause | ||||
|             # await pause() | ||||
|             msg: dict = await ctx._recv_chan.receive() | ||||
|             ctx._result: Any = msg['return'] | ||||
|             log.runtime( | ||||
|                 'Context delivered final result msg:\n' | ||||
|                 f'{pformat(msg)}' | ||||
|             ) | ||||
|             pre_result_drained.append(msg) | ||||
|             # NOTE: we don't need to do this right? | ||||
|             # XXX: only close the rx mem chan AFTER | ||||
|             # a final result is retreived. | ||||
|             # if ctx._recv_chan: | ||||
|             #     await ctx._recv_chan.aclose() | ||||
|             break | ||||
| 
 | ||||
|         # NOTE: we get here if the far end was | ||||
|         # `ContextCancelled` in 2 cases: | ||||
|         # 1. we requested the cancellation and thus | ||||
|         #    SHOULD NOT raise that far end error, | ||||
|         # 2. WE DID NOT REQUEST that cancel and thus | ||||
|         #    SHOULD RAISE HERE! | ||||
|         except trio.Cancelled: | ||||
| 
 | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|             # cancellation. | ||||
|             if re := ctx._remote_error: | ||||
|                 ctx._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|             # CASE 1: we DID request the cancel we simply | ||||
|             # continue to bubble up as normal. | ||||
|             raise | ||||
| 
 | ||||
|         except KeyError: | ||||
| 
 | ||||
|             if 'yield' in msg: | ||||
|                 # far end task is still streaming to us so discard | ||||
|                 log.warning(f'Discarding std "yield"\n{msg}') | ||||
|                 pre_result_drained.append(msg) | ||||
|                 continue | ||||
| 
 | ||||
|             # TODO: work out edge cases here where | ||||
|             # a stream is open but the task also calls | ||||
|             # this? | ||||
|             # -[ ] should be a runtime error if a stream is open | ||||
|             #   right? | ||||
|             elif 'stop' in msg: | ||||
|                 log.cancel( | ||||
|                     'Remote stream terminated due to "stop" msg:\n' | ||||
|                     f'{msg}' | ||||
|                 ) | ||||
|                 pre_result_drained.append(msg) | ||||
|                 continue | ||||
| 
 | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ( | ||||
|                 "Received internal error at portal?" | ||||
|             ) | ||||
| 
 | ||||
|             # XXX fallthrough to handle expected error XXX | ||||
|             re: Exception|None = ctx._remote_error | ||||
|             if re: | ||||
|                 log.critical( | ||||
|                     'Remote ctx terminated due to "error" msg:\n' | ||||
|                     f'{re}' | ||||
|                 ) | ||||
|                 assert msg is ctx._cancel_msg | ||||
|                 # NOTE: this solved a super dupe edge case XD | ||||
|                 # this was THE super duper edge case of: | ||||
|                 # - local task opens a remote task, | ||||
|                 # - requests remote cancellation of far end | ||||
|                 #   ctx/tasks, | ||||
|                 # - needs to wait for the cancel ack msg | ||||
|                 #   (ctxc) or some result in the race case | ||||
|                 #   where the other side's task returns | ||||
|                 #   before the cancel request msg is ever | ||||
|                 #   rxed and processed, | ||||
|                 # - here this surrounding drain loop (which | ||||
|                 #   iterates all ipc msgs until the ack or | ||||
|                 #   an early result arrives) was NOT exiting | ||||
|                 #   since we are the edge case: local task | ||||
|                 #   does not re-raise any ctxc it receives | ||||
|                 #   IFF **it** was the cancellation | ||||
|                 #   requester.. | ||||
|                 # will raise if necessary, ow break from | ||||
|                 # loop presuming any error terminates the | ||||
|                 # context! | ||||
|                 ctx._maybe_raise_remote_err( | ||||
|                     re, | ||||
|                     # NOTE: obvi we don't care if we | ||||
|                     # overran the far end if we're already | ||||
|                     # waiting on a final result (msg). | ||||
|                     # raise_overrun_from_self=False, | ||||
|                     raise_overrun_from_self=raise_overrun, | ||||
|                 ) | ||||
| 
 | ||||
|                 break  # OOOOOF, yeah obvi we need this.. | ||||
| 
 | ||||
|             # XXX we should never really get here | ||||
|             # right! since `._deliver_msg()` should | ||||
|             # always have detected an {'error': ..} | ||||
|             # msg and already called this right!?! | ||||
|             elif error := unpack_error( | ||||
|                 msg=msg, | ||||
|                 chan=ctx._portal.channel, | ||||
|                 hide_tb=False, | ||||
|             ): | ||||
|                 log.critical('SHOULD NEVER GET HERE!?') | ||||
|                 assert msg is ctx._cancel_msg | ||||
|                 assert error.msgdata == ctx._remote_error.msgdata | ||||
|                 from .devx._debug import pause | ||||
|                 await pause() | ||||
|                 ctx._maybe_cancel_and_set_remote_error(error) | ||||
|                 ctx._maybe_raise_remote_err(error) | ||||
| 
 | ||||
|             else: | ||||
|                 # bubble the original src key error | ||||
|                 raise | ||||
| 
 | ||||
|     return pre_result_drained | ||||
| 
 | ||||
| 
 | ||||
| # TODO: make this a msgspec.Struct! | ||||
| @dataclass | ||||
| class Context: | ||||
|  | @ -118,6 +281,7 @@ class Context: | |||
|     # which is exactly the primitive that allows for | ||||
|     # cross-actor-task-supervision and thus SC. | ||||
|     _scope: trio.CancelScope | None = None | ||||
|     # _res_scope: trio.CancelScope|None = None | ||||
| 
 | ||||
|     # on a clean exit there should be a final value | ||||
|     # delivered from the far end "callee" task, so | ||||
|  | @ -205,6 +369,10 @@ class Context: | |||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|     # @property | ||||
|     # def is_waiting_result(self) -> bool: | ||||
|     #     return bool(self._res_scope) | ||||
| 
 | ||||
|     @property | ||||
|     def side(self) -> str: | ||||
|         ''' | ||||
|  | @ -247,7 +415,11 @@ class Context: | |||
|         await self.chan.send({'yield': data, 'cid': self.cid}) | ||||
| 
 | ||||
|     async def send_stop(self) -> None: | ||||
|         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||
|         # await pause() | ||||
|         await self.chan.send({ | ||||
|             'stop': True, | ||||
|             'cid': self.cid | ||||
|         }) | ||||
| 
 | ||||
|     def _maybe_cancel_and_set_remote_error( | ||||
|         self, | ||||
|  | @ -320,27 +492,37 @@ class Context: | |||
|         # XXX: set the remote side's error so that after we cancel | ||||
|         # whatever task is the opener of this context it can raise | ||||
|         # that error as the reason. | ||||
|         # if self._remote_error: | ||||
|         #     return | ||||
| 
 | ||||
|         # breakpoint() | ||||
|         log.cancel( | ||||
|             'Setting remote error for ctx \n' | ||||
|             f'<= remote ctx uid: {self.chan.uid}\n' | ||||
|             f'=>\n{error}' | ||||
|         ) | ||||
|         self._remote_error: BaseException = error | ||||
| 
 | ||||
|         if ( | ||||
|             isinstance(error, ContextCancelled) | ||||
|         ): | ||||
|             # always record the cancelling actor's uid since its cancellation | ||||
|             # state is linked and we want to know which process was | ||||
|             # the cause / requester of the cancellation. | ||||
|             self._canceller = error.canceller | ||||
| 
 | ||||
|             log.cancel( | ||||
|                 'Remote task-context was cancelled for ' | ||||
|                 f'actor: {self.chan.uid}\n' | ||||
|                 f'task: {self.cid}\n' | ||||
|                 f'canceller: {error.canceller}\n' | ||||
|             ) | ||||
|             # always record the cancelling actor's uid since its cancellation | ||||
|             # state is linked and we want to know which process was | ||||
|             # the cause / requester of the cancellation. | ||||
|             # if error.canceller is None: | ||||
|             #     import pdbp; pdbp.set_trace() | ||||
| 
 | ||||
|                 # breakpoint() | ||||
|             self._canceller = error.canceller | ||||
| 
 | ||||
| 
 | ||||
|             if self._cancel_called: | ||||
|                 # from ._debug import breakpoint | ||||
|                 # await breakpoint() | ||||
| 
 | ||||
|                 # this is an expected cancel request response message | ||||
|                 # and we **don't need to raise it** in local cancel | ||||
|                 # scope since it will potentially override a real error. | ||||
|  | @ -348,10 +530,11 @@ class Context: | |||
| 
 | ||||
|         else: | ||||
|             log.error( | ||||
|                 f'Remote context error,\n' | ||||
|                 f'remote actor: {self.chan.uid}\n' | ||||
|                 f'task: {self.cid}\n' | ||||
|                 f'{error}' | ||||
|                 f'Remote context error:\n' | ||||
|                 f'{error}\n' | ||||
|                 f'{pformat(self)}\n' | ||||
|                 # f'remote actor: {self.chan.uid}\n' | ||||
|                 # f'cid: {self.cid}\n' | ||||
|             ) | ||||
|             self._canceller = self.chan.uid | ||||
| 
 | ||||
|  | @ -376,9 +559,11 @@ class Context: | |||
|             self._scope.cancel() | ||||
| 
 | ||||
|             # NOTE: this REPL usage actually works here dawg! Bo | ||||
|             # from .devx._debug import pause | ||||
|             # await pause() | ||||
| 
 | ||||
|         # TODO: maybe we have to use `._res_scope.cancel()` if it | ||||
|         # exists? | ||||
| 
 | ||||
|     async def cancel( | ||||
|         self, | ||||
|         timeout: float = 0.616, | ||||
|  | @ -395,6 +580,8 @@ class Context: | |||
|         log.cancel( | ||||
|             f'Cancelling {side} side of context to {self.chan.uid}' | ||||
|         ) | ||||
| 
 | ||||
|         # await pause() | ||||
|         self._cancel_called: bool = True | ||||
| 
 | ||||
|         # caller side who entered `Portal.open_context()` | ||||
|  | @ -484,13 +671,11 @@ class Context: | |||
|         ''' | ||||
|         actor: Actor = current_actor() | ||||
| 
 | ||||
|         # here we create a mem chan that corresponds to the | ||||
|         # far end caller / callee. | ||||
| 
 | ||||
|         # Likewise if the surrounding context has been cancelled we error here | ||||
|         # since it likely means the surrounding block was exited or | ||||
|         # killed | ||||
| 
 | ||||
|         # If the surrounding context has been cancelled by some | ||||
|         # task with a handle to THIS, we error here immediately | ||||
|         # since it likely means the surrounding lexical-scope has | ||||
|         # errored, been `trio.Cancelled` or at the least | ||||
|         # `Context.cancel()` was called by some task. | ||||
|         if self._cancel_called: | ||||
| 
 | ||||
|             # XXX NOTE: ALWAYS RAISE any remote error here even if | ||||
|  | @ -503,6 +688,11 @@ class Context: | |||
|             # actually try to stream - a cancel msg was already | ||||
|             # sent to the other side! | ||||
|             if self._remote_error: | ||||
|                 # NOTE: this is diff then calling | ||||
|                 # `._maybe_raise_from_remote_msg()` specifically | ||||
|                 # because any task entering this `.open_stream()` | ||||
|                 # AFTER cancellation has already been requested, | ||||
|                 # we DO NOT want to absorb any ctxc ACK silently! | ||||
|                 raise self._remote_error | ||||
| 
 | ||||
|             # XXX NOTE: if no `ContextCancelled` has been responded | ||||
|  | @ -529,7 +719,7 @@ class Context: | |||
|         # to send a stop from the caller to the callee in the | ||||
|         # single-direction-stream case you'll get a lookup error | ||||
|         # currently. | ||||
|         ctx = actor.get_context( | ||||
|         ctx: Context = actor.get_context( | ||||
|             self.chan, | ||||
|             self.cid, | ||||
|             msg_buffer_size=msg_buffer_size, | ||||
|  | @ -548,6 +738,19 @@ class Context: | |||
|                 'The underlying channel for this stream was already closed!?' | ||||
|             ) | ||||
| 
 | ||||
|         # NOTE: implicitly this will call `MsgStream.aclose()` on | ||||
|         # `.__aexit__()` due to stream's parent `Channel` type! | ||||
|         # | ||||
|         # XXX NOTE XXX: ensures the stream is "one-shot use", | ||||
|         # which specifically means that on exit, | ||||
|         # - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to | ||||
|         #   the far end indicating that the caller exited | ||||
|         #   the streaming context purposefully by letting | ||||
|         #   the exit block exec. | ||||
|         # - this is diff from the cancel/error case where | ||||
|         #   a cancel request from this side or an error | ||||
|         #   should be sent to the far end indicating the | ||||
|         #   stream WAS NOT just closed normally/gracefully. | ||||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=ctx._recv_chan, | ||||
|  | @ -567,11 +770,37 @@ class Context: | |||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield stream | ||||
| 
 | ||||
|                 # NOTE: Make the stream "one-shot use".  On exit, | ||||
|                 # signal | ||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to | ||||
|                 # the far end. | ||||
|                 await stream.aclose() | ||||
| 
 | ||||
|                 # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|                 # wait for any immediate child in debug before popping the | ||||
|                 # context from the runtime msg loop otherwise inside | ||||
|                 # ``Actor._push_result()`` the msg will be discarded and in | ||||
|                 # the case where that msg is global debugger unlock (via | ||||
|                 # a "stop" msg for a stream), this can result in a deadlock | ||||
|                 # where the root is waiting on the lock to clear but the | ||||
|                 # child has already cleared it and clobbered IPC. | ||||
|                 # | ||||
|                 # await maybe_wait_for_debugger() | ||||
| 
 | ||||
|                 # XXX TODO: pretty sure this isn't needed (see | ||||
|                 # note above this block) AND will result in | ||||
|                 # a double `.send_stop()` call. The only reason to | ||||
|                 # put it here would be to due with "order" in | ||||
|                 # terms of raising any remote error (as per | ||||
|                 # directly below) or bc the stream's | ||||
|                 # `.__aexit__()` block might not get run | ||||
|                 # (doubtful)? Either way if we did put this back | ||||
|                 # in we also need a state var to avoid the double | ||||
|                 # stop-msg send.. | ||||
|                 # | ||||
|                 # await stream.aclose() | ||||
| 
 | ||||
|                 # if re := ctx._remote_error: | ||||
|                 #     ctx._maybe_raise_remote_err( | ||||
|                 #         re, | ||||
|                 #         raise_ctxc_from_self_call=True, | ||||
|                 #     ) | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
|             finally: | ||||
|                 if self._portal: | ||||
|  | @ -587,7 +816,10 @@ class Context: | |||
|     def _maybe_raise_remote_err( | ||||
|         self, | ||||
|         err: Exception, | ||||
|     ) -> None: | ||||
|         raise_ctxc_from_self_call: bool = False, | ||||
|         raise_overrun_from_self: bool = True, | ||||
| 
 | ||||
|     ) -> ContextCancelled|None: | ||||
|         ''' | ||||
|         Maybe raise a remote error depending on who (which task from | ||||
|         which actor) requested a cancellation (if any). | ||||
|  | @ -603,13 +835,21 @@ class Context: | |||
|         # "error"-msg. | ||||
|         our_uid: tuple[str, str] = current_actor().uid | ||||
|         if ( | ||||
|             isinstance(err, ContextCancelled) | ||||
|             and ( | ||||
|             (not raise_ctxc_from_self_call | ||||
|              and isinstance(err, ContextCancelled) | ||||
|              and ( | ||||
|                 self._cancel_called | ||||
|                 or self.chan._cancel_called | ||||
|                 or self.canceller == our_uid | ||||
|                 or tuple(err.canceller) == our_uid | ||||
|                 or tuple(err.canceller) == our_uid) | ||||
|             ) | ||||
|             or | ||||
|             (not raise_overrun_from_self | ||||
|              and isinstance(err, RemoteActorError) | ||||
|              and err.msgdata['type_str'] == 'StreamOverrun' | ||||
|              and tuple(err.msgdata['sender']) == our_uid | ||||
|             ) | ||||
| 
 | ||||
|         ): | ||||
|             # NOTE: we set the local scope error to any "self | ||||
|             # cancellation" error-response thus "absorbing" | ||||
|  | @ -661,77 +901,196 @@ class Context: | |||
|         assert self._portal, "Context.result() can not be called from callee!" | ||||
|         assert self._recv_chan | ||||
| 
 | ||||
|         if re := self._remote_error: | ||||
|             return self._maybe_raise_remote_err(re) | ||||
|         raise_overrun: bool = not self._allow_overruns | ||||
|         # if re := self._remote_error: | ||||
|         #     return self._maybe_raise_remote_err( | ||||
|         #         re, | ||||
|         #         # NOTE: obvi we don't care if we | ||||
|         #         # overran the far end if we're already | ||||
|         #         # waiting on a final result (msg). | ||||
|         #         raise_overrun_from_self=raise_overrun, | ||||
|         #     ) | ||||
| 
 | ||||
|         res_placeholder: int = id(self) | ||||
|         if ( | ||||
|             self._result == id(self) | ||||
|             self._result == res_placeholder | ||||
|             and not self._remote_error | ||||
|             and not self._recv_chan._closed  # type: ignore | ||||
|         ): | ||||
|             # wait for a final context result consuming | ||||
|             # and discarding any bi dir stream msgs still | ||||
|             # in transit from the far end. | ||||
|             while True: | ||||
|                 try: | ||||
|                     msg = await self._recv_chan.receive() | ||||
|                     self._result: Any = msg['return'] | ||||
| 
 | ||||
|                     # NOTE: we don't need to do this right? | ||||
|                     # XXX: only close the rx mem chan AFTER | ||||
|                     # a final result is retreived. | ||||
|                     # if self._recv_chan: | ||||
|                     #     await self._recv_chan.aclose() | ||||
|             # wait for a final context result by collecting (but | ||||
|             # basically ignoring) any bi-dir-stream msgs still in transit | ||||
|             # from the far end. | ||||
|             drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self) | ||||
|             log.runtime( | ||||
|                 'Ctx drained pre-result msgs:\n' | ||||
|                 f'{drained_msgs}' | ||||
|             ) | ||||
| 
 | ||||
|                     break | ||||
|             # TODO: implement via helper func ^^^^ | ||||
|             # pre_result_drained: list[dict] = [] | ||||
|             # while not self._remote_error: | ||||
|             #     try: | ||||
|             #         # NOTE: this REPL usage actually works here dawg! Bo | ||||
|             #         # from .devx._debug import pause | ||||
|             #         # await pause() | ||||
|             #         # if re := self._remote_error: | ||||
|             #         #     self._maybe_raise_remote_err( | ||||
|             #         #         re, | ||||
|             #         #         # NOTE: obvi we don't care if we | ||||
|             #         #         # overran the far end if we're already | ||||
|             #         #         # waiting on a final result (msg). | ||||
|             #         #         raise_overrun_from_self=raise_overrun, | ||||
|             #         #     ) | ||||
| 
 | ||||
|                 # NOTE: we get here if the far end was | ||||
|                 # `ContextCancelled` in 2 cases: | ||||
|                 # 1. we requested the cancellation and thus | ||||
|                 #    SHOULD NOT raise that far end error, | ||||
|                 # 2. WE DID NOT REQUEST that cancel and thus | ||||
|                 #    SHOULD RAISE HERE! | ||||
|                 except trio.Cancelled: | ||||
|             #         # TODO: bad idea? | ||||
|             #         # with trio.CancelScope() as res_cs: | ||||
|             #         #     self._res_scope = res_cs | ||||
|             #         #     msg: dict = await self._recv_chan.receive() | ||||
|             #         # if res_cs.cancelled_caught: | ||||
| 
 | ||||
|                     # CASE 2: mask the local cancelled-error(s) | ||||
|                     # only when we are sure the remote error is the | ||||
|                     # (likely) source cause of this local runtime | ||||
|                     # task's cancellation. | ||||
|                     if re := self._remote_error: | ||||
|                         self._maybe_raise_remote_err(re) | ||||
|             #         # from .devx._debug import pause | ||||
|             #         # await pause() | ||||
|             #         msg: dict = await self._recv_chan.receive() | ||||
|             #         self._result: Any = msg['return'] | ||||
|             #         log.runtime( | ||||
|             #             'Context delivered final result msg:\n' | ||||
|             #             f'{pformat(msg)}' | ||||
|             #         ) | ||||
|             #         # NOTE: we don't need to do this right? | ||||
|             #         # XXX: only close the rx mem chan AFTER | ||||
|             #         # a final result is retreived. | ||||
|             #         # if self._recv_chan: | ||||
|             #         #     await self._recv_chan.aclose() | ||||
|             #         break | ||||
| 
 | ||||
|                     # CASE 1: we DID request the cancel we simply | ||||
|                     # continue to bubble up as normal. | ||||
|                     raise | ||||
|             #     # NOTE: we get here if the far end was | ||||
|             #     # `ContextCancelled` in 2 cases: | ||||
|             #     # 1. we requested the cancellation and thus | ||||
|             #     #    SHOULD NOT raise that far end error, | ||||
|             #     # 2. WE DID NOT REQUEST that cancel and thus | ||||
|             #     #    SHOULD RAISE HERE! | ||||
|             #     except trio.Cancelled: | ||||
| 
 | ||||
|                 except KeyError:  # as msgerr: | ||||
|             #         # CASE 2: mask the local cancelled-error(s) | ||||
|             #         # only when we are sure the remote error is | ||||
|             #         # the source cause of this local task's | ||||
|             #         # cancellation. | ||||
|             #         if re := self._remote_error: | ||||
|             #             self._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|                     if 'yield' in msg: | ||||
|                         # far end task is still streaming to us so discard | ||||
|                         log.warning(f'Discarding stream delivered {msg}') | ||||
|                         continue | ||||
|             #         # CASE 1: we DID request the cancel we simply | ||||
|             #         # continue to bubble up as normal. | ||||
|             #         raise | ||||
| 
 | ||||
|                     elif 'stop' in msg: | ||||
|                         log.debug('Remote stream terminated') | ||||
|                         continue | ||||
|             #     except KeyError: | ||||
| 
 | ||||
|                     # internal error should never get here | ||||
|                     assert msg.get('cid'), ( | ||||
|                         "Received internal error at portal?" | ||||
|                     ) | ||||
|             #         if 'yield' in msg: | ||||
|             #             # far end task is still streaming to us so discard | ||||
|             #             log.warning(f'Discarding std "yield"\n{msg}') | ||||
|             #             pre_result_drained.append(msg) | ||||
|             #             continue | ||||
| 
 | ||||
|                     if err:= unpack_error( | ||||
|                         msg, | ||||
|                         self._portal.channel | ||||
|                     ):  # from msgerr | ||||
|                         self._maybe_cancel_and_set_remote_error(err) | ||||
|                         self._maybe_raise_remote_err(err) | ||||
|             #         # TODO: work out edge cases here where | ||||
|             #         # a stream is open but the task also calls | ||||
|             #         # this? | ||||
|             #         # -[ ] should be a runtime error if a stream is open | ||||
|             #         #   right? | ||||
|             #         elif 'stop' in msg: | ||||
|             #             log.cancel( | ||||
|             #                 'Remote stream terminated due to "stop" msg:\n' | ||||
|             #                 f'{msg}' | ||||
|             #             ) | ||||
|             #             pre_result_drained.append(msg) | ||||
|             #             continue | ||||
| 
 | ||||
|                     else: | ||||
|                         raise | ||||
|             #         # internal error should never get here | ||||
|             #         assert msg.get('cid'), ( | ||||
|             #             "Received internal error at portal?" | ||||
|             #         ) | ||||
| 
 | ||||
|         if re := self._remote_error: | ||||
|             return self._maybe_raise_remote_err(re) | ||||
|             #         # XXX fallthrough to handle expected error XXX | ||||
|             #         re: Exception|None = self._remote_error | ||||
|             #         if re: | ||||
|             #             log.critical( | ||||
|             #                 'Remote ctx terminated due to "error" msg:\n' | ||||
|             #                 f'{re}' | ||||
|             #             ) | ||||
|             #             assert msg is self._cancel_msg | ||||
|             #             # NOTE: this solved a super dupe edge case XD | ||||
|             #             # this was THE super duper edge case of: | ||||
|             #             # - local task opens a remote task, | ||||
|             #             # - requests remote cancellation of far end | ||||
|             #             #   ctx/tasks, | ||||
|             #             # - needs to wait for the cancel ack msg | ||||
|             #             #   (ctxc) or some result in the race case | ||||
|             #             #   where the other side's task returns | ||||
|             #             #   before the cancel request msg is ever | ||||
|             #             #   rxed and processed, | ||||
|             #             # - here this surrounding drain loop (which | ||||
|             #             #   iterates all ipc msgs until the ack or | ||||
|             #             #   an early result arrives) was NOT exiting | ||||
|             #             #   since we are the edge case: local task | ||||
|             #             #   does not re-raise any ctxc it receives | ||||
|             #             #   IFF **it** was the cancellation | ||||
|             #             #   requester.. | ||||
|             #             # will raise if necessary, ow break from | ||||
|             #             # loop presuming any error terminates the | ||||
|             #             # context! | ||||
|             #             self._maybe_raise_remote_err( | ||||
|             #                 re, | ||||
|             #                 # NOTE: obvi we don't care if we | ||||
|             #                 # overran the far end if we're already | ||||
|             #                 # waiting on a final result (msg). | ||||
|             #                 # raise_overrun_from_self=False, | ||||
|             #                 raise_overrun_from_self=raise_overrun, | ||||
|             #             ) | ||||
| 
 | ||||
|             #             break  # OOOOOF, yeah obvi we need this.. | ||||
| 
 | ||||
|             #         # XXX we should never really get here | ||||
|             #         # right! since `._deliver_msg()` should | ||||
|             #         # always have detected an {'error': ..} | ||||
|             #         # msg and already called this right!?! | ||||
|             #         elif error := unpack_error( | ||||
|             #             msg=msg, | ||||
|             #             chan=self._portal.channel, | ||||
|             #             hide_tb=False, | ||||
|             #         ): | ||||
|             #             log.critical('SHOULD NEVER GET HERE!?') | ||||
|             #             assert msg is self._cancel_msg | ||||
|             #             assert error.msgdata == self._remote_error.msgdata | ||||
|             #             from .devx._debug import pause | ||||
|             #             await pause() | ||||
|             #             self._maybe_cancel_and_set_remote_error(error) | ||||
|             #             self._maybe_raise_remote_err(error) | ||||
| 
 | ||||
|             #         else: | ||||
|             #             # bubble the original src key error | ||||
|             #             raise | ||||
| 
 | ||||
|         if ( | ||||
|             (re := self._remote_error) | ||||
|             and self._result == res_placeholder | ||||
|         ): | ||||
|             maybe_err: Exception|None = self._maybe_raise_remote_err( | ||||
|                 re, | ||||
|                 # NOTE: obvi we don't care if we | ||||
|                 # overran the far end if we're already | ||||
|                 # waiting on a final result (msg). | ||||
|                 # raise_overrun_from_self=False, | ||||
|                 raise_overrun_from_self=( | ||||
|                     raise_overrun | ||||
|                     and | ||||
|                     # only when we ARE NOT the canceller | ||||
|                     # should we raise overruns, bc ow we're | ||||
|                     # raising something we know might happen | ||||
|                     # during cancellation ;) | ||||
|                     (not self._cancel_called) | ||||
|                 ), | ||||
|             ) | ||||
|             if maybe_err: | ||||
|                 self._result = maybe_err | ||||
| 
 | ||||
|         return self._result | ||||
| 
 | ||||
|  | @ -779,7 +1138,7 @@ class Context: | |||
|             while self._overflow_q: | ||||
|                 # NOTE: these msgs should never be errors since we always do | ||||
|                 # the check prior to checking if we're in an overrun state | ||||
|                 # inside ``.deliver_msg()``. | ||||
|                 # inside ``._deliver_msg()``. | ||||
|                 msg = self._overflow_q.popleft() | ||||
|                 try: | ||||
|                     await self._send_chan.send(msg) | ||||
|  | @ -830,34 +1189,50 @@ class Context: | |||
|         messages are eventually sent if possible. | ||||
| 
 | ||||
|         ''' | ||||
|         cid = self.cid | ||||
|         chan = self.chan | ||||
|         uid = chan.uid | ||||
|         cid: str = self.cid | ||||
|         chan: Channel = self.chan | ||||
|         from_uid: tuple[str, str]  = chan.uid | ||||
|         send_chan: trio.MemorySendChannel = self._send_chan | ||||
| 
 | ||||
|         log.runtime( | ||||
|             f"Delivering {msg} from {uid} to caller {cid}" | ||||
|         ) | ||||
| 
 | ||||
|         if ( | ||||
|             msg.get('error')  # check for field | ||||
|             and ( | ||||
|                 error := unpack_error( | ||||
|                     msg, | ||||
|                     self.chan, | ||||
|                 ) | ||||
|         if re := unpack_error( | ||||
|             msg, | ||||
|             self.chan, | ||||
|         ): | ||||
|             log.error( | ||||
|                 f'Delivering error-msg from {from_uid} to caller {cid}' | ||||
|                 f'{re}' | ||||
|             ) | ||||
|         ): | ||||
|             self._cancel_msg = msg | ||||
|             self._maybe_cancel_and_set_remote_error(error) | ||||
|             self._maybe_cancel_and_set_remote_error(re) | ||||
| 
 | ||||
|         if ( | ||||
|             self._in_overrun | ||||
|         ): | ||||
|             # XXX NEVER do this XXX..!! | ||||
|             # bc if the error is a ctxc and there is a task | ||||
|             # waiting on `.result()` we need the msg to be sent | ||||
|             # over the `send_chan`/`._recv_chan` so that the error | ||||
|             # is relayed to that waiter task.. | ||||
|             # return True | ||||
|             # | ||||
|             # XXX ALSO NO!! XXX | ||||
|             # if self._remote_error: | ||||
|             #     self._maybe_raise_remote_err(error) | ||||
| 
 | ||||
|         if self._in_overrun: | ||||
|             log.warning( | ||||
|                 f'Capturing overrun-msg from {from_uid} to caller {cid}' | ||||
|                 f'{msg}' | ||||
|             ) | ||||
|             self._overflow_q.append(msg) | ||||
|             return False | ||||
| 
 | ||||
|         try: | ||||
|             log.runtime( | ||||
|                 f'Delivering IPC `Context` msg:\n' | ||||
|                 f'<= {from_uid}\n' | ||||
|                 f'=> caller: {cid}\n' | ||||
|                 f'{msg}' | ||||
|             ) | ||||
|             # from .devx._debug import pause | ||||
|             # await pause() | ||||
|             send_chan.send_nowait(msg) | ||||
|             return True | ||||
|             # if an error is deteced we should always | ||||
|  | @ -890,7 +1265,8 @@ class Context: | |||
|             lines = [ | ||||
|                 f'OVERRUN on actor-task context {cid}@{local_uid}!\n' | ||||
|                 # TODO: put remote task name here if possible? | ||||
|                 f'remote sender actor: {uid}', | ||||
|                 f'sender: {from_uid}', | ||||
|                 f'msg: {msg}', | ||||
|                 # TODO: put task func name here and maybe an arrow | ||||
|                 # from sender to overrunner? | ||||
|                 # f'local task {self.func_name}' | ||||
|  | @ -926,11 +1302,19 @@ class Context: | |||
|                         # anything different. | ||||
|                         return False | ||||
|             else: | ||||
|                 # raise local overrun and immediately pack as IPC | ||||
|                 # msg for far end. | ||||
|                 try: | ||||
|                     raise StreamOverrun(text) | ||||
|                     raise StreamOverrun( | ||||
|                         text, | ||||
|                         sender=from_uid, | ||||
|                     ) | ||||
|                 except StreamOverrun as err: | ||||
|                     err_msg = pack_error(err) | ||||
|                     err_msg['cid'] = cid | ||||
|                     err_msg: dict[str, dict] = pack_error( | ||||
|                         err, | ||||
|                         cid=cid, | ||||
|                     ) | ||||
|                     # err_msg['cid']: str = cid | ||||
|                     try: | ||||
|                         await chan.send(err_msg) | ||||
|                     except trio.BrokenResourceError: | ||||
|  |  | |||
|  | @ -39,7 +39,15 @@ import trio | |||
| from async_generator import asynccontextmanager | ||||
| 
 | ||||
| from .trionics import maybe_open_nursery | ||||
| from ._state import current_actor | ||||
| from .devx import ( | ||||
|     # acquire_debug_lock, | ||||
|     # pause, | ||||
|     maybe_wait_for_debugger, | ||||
| ) | ||||
| from ._state import ( | ||||
|     current_actor, | ||||
|     debug_mode, | ||||
| ) | ||||
| from ._ipc import Channel | ||||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
|  | @ -48,6 +56,7 @@ from ._exceptions import ( | |||
|     unpack_error, | ||||
|     NoResult, | ||||
|     ContextCancelled, | ||||
|     RemoteActorError, | ||||
| ) | ||||
| from ._context import ( | ||||
|     Context, | ||||
|  | @ -468,7 +477,6 @@ class Portal: | |||
|             ctx._started_called: bool = True | ||||
| 
 | ||||
|         except KeyError as src_error: | ||||
| 
 | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=ctx, | ||||
|                 msg=msg, | ||||
|  | @ -493,6 +501,33 @@ class Portal: | |||
|                 # in enter tuple. | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|                 # between the caller exiting and arriving here the | ||||
|                 # far end may have sent a ctxc-msg or other error, | ||||
|                 # so check for it here immediately and maybe raise | ||||
|                 # so as to engage the ctxc handling block below! | ||||
|                 # if re := ctx._remote_error: | ||||
|                 #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( | ||||
|                 #         re, | ||||
| 
 | ||||
|                 #         # TODO: do we want this to always raise? | ||||
|                 #         # - means that on self-ctxc, if/when the | ||||
|                 #         #   block is exited before the msg arrives | ||||
|                 #         #   but then the msg during __exit__ | ||||
|                 #         #   calling we may not activate the | ||||
|                 #         #   ctxc-handler block below? should we | ||||
|                 #         #   be? | ||||
|                 #         # - if there's a remote error that arrives | ||||
|                 #         #   after the child has exited, we won't | ||||
|                 #         #   handle until the `finally:` block | ||||
|                 #         #   where `.result()` is always called, | ||||
|                 #         #   again in which case we handle it | ||||
|                 #         #   differently then in the handler block | ||||
|                 #         #   that would normally engage from THIS | ||||
|                 #         #   block? | ||||
|                 #         raise_ctxc_from_self_call=True, | ||||
|                 #     ) | ||||
|                 #     assert maybe_ctxc | ||||
| 
 | ||||
|                 # when in allow_overruns mode there may be | ||||
|                 # lingering overflow sender tasks remaining? | ||||
|                 if nurse.child_tasks: | ||||
|  | @ -538,7 +573,7 @@ class Portal: | |||
|         #   `.canceller: tuple[str, str]` to be same value as | ||||
|         #   caught here in a `ContextCancelled.canceller`. | ||||
|         # | ||||
|         # Again, there are 2 cases: | ||||
|         # AGAIN to restate the above, there are 2 cases: | ||||
|         # | ||||
|         # 1-some other context opened in this `.open_context()` | ||||
|         #   block cancelled due to a self or peer cancellation | ||||
|  | @ -554,6 +589,16 @@ class Portal: | |||
|         except ContextCancelled as ctxc: | ||||
|             scope_err = ctxc | ||||
| 
 | ||||
|             # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! | ||||
|             # using this code and then resuming the REPL will | ||||
|             # cause a SIGINT-ignoring HANG! | ||||
|             # -> prolly due to a stale debug lock entry.. | ||||
|             # -[ ] USE `.stackscope` to demonstrate that (possibly | ||||
|             #   documenting it as a definittive example of | ||||
|             #   debugging the tractor-runtime itself using it's | ||||
|             #   own `.devx.` tooling! | ||||
|             # await pause() | ||||
| 
 | ||||
|             # CASE 2: context was cancelled by local task calling | ||||
|             # `.cancel()`, we don't raise and the exit block should | ||||
|             # exit silently. | ||||
|  | @ -561,18 +606,23 @@ class Portal: | |||
|                 ctx._cancel_called | ||||
|                 and ( | ||||
|                     ctxc is ctx._remote_error | ||||
|                     or | ||||
|                     ctxc.canceller is self.canceller | ||||
|                     # ctxc.msgdata == ctx._remote_error.msgdata | ||||
| 
 | ||||
|                     # TODO: uhh `Portal.canceller` ain't a thangg | ||||
|                     # dawg? (was `self.canceller` before?!?) | ||||
|                     and | ||||
|                     ctxc.canceller == self.actor.uid | ||||
|                 ) | ||||
|             ): | ||||
|                 log.debug( | ||||
|                     f'Context {ctx} cancelled gracefully with:\n' | ||||
|                 log.cancel( | ||||
|                     f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' | ||||
|                     f'{ctxc}' | ||||
|                 ) | ||||
|             # CASE 1: this context was never cancelled via a local | ||||
|             # task (tree) having called `Context.cancel()`, raise | ||||
|             # the error since it was caused by someone else! | ||||
|             else: | ||||
|                 # await pause() | ||||
|                 raise | ||||
| 
 | ||||
|         # the above `._scope` can be cancelled due to: | ||||
|  | @ -601,8 +651,8 @@ class Portal: | |||
|             trio.Cancelled,  # NOTE: NOT from inside the ctx._scope | ||||
|             KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as err: | ||||
|             scope_err = err | ||||
|         ) as caller_err: | ||||
|             scope_err = caller_err | ||||
| 
 | ||||
|             # XXX: ALWAYS request the context to CANCEL ON any ERROR. | ||||
|             # NOTE: `Context.cancel()` is conversely NEVER CALLED in | ||||
|  | @ -610,11 +660,26 @@ class Portal: | |||
|             # handled in the block above! | ||||
|             log.cancel( | ||||
|                 'Context cancelled for task due to\n' | ||||
|                 f'{err}\n' | ||||
|                 f'{caller_err}\n' | ||||
|                 'Sending cancel request..\n' | ||||
|                 f'task:{cid}\n' | ||||
|                 f'actor:{uid}' | ||||
|             ) | ||||
| 
 | ||||
|             if debug_mode(): | ||||
|                 log.pdb( | ||||
|                     'Delaying `ctx.cancel()` until debug lock ' | ||||
|                     'acquired..' | ||||
|                 ) | ||||
|                 # async with acquire_debug_lock(self.actor.uid): | ||||
|                 #     pass | ||||
|                 # TODO: factor ^ into below for non-root cases? | ||||
|                 await maybe_wait_for_debugger() | ||||
|                 log.pdb( | ||||
|                     'Acquired debug lock! ' | ||||
|                     'Calling `ctx.cancel()`!' | ||||
|                 ) | ||||
| 
 | ||||
|             try: | ||||
|                 await ctx.cancel() | ||||
|             except trio.BrokenResourceError: | ||||
|  | @ -628,6 +693,33 @@ class Portal: | |||
| 
 | ||||
|         # no local scope error, the "clean exit with a result" case. | ||||
|         else: | ||||
|             # between the caller exiting and arriving here the | ||||
|             # far end may have sent a ctxc-msg or other error, | ||||
|             # so check for it here immediately and maybe raise | ||||
|             # so as to engage the ctxc handling block below! | ||||
|             # if re := ctx._remote_error: | ||||
|             #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( | ||||
|             #         re, | ||||
| 
 | ||||
|             #         # TODO: do we want this to always raise? | ||||
|             #         # - means that on self-ctxc, if/when the | ||||
|             #         #   block is exited before the msg arrives | ||||
|             #         #   but then the msg during __exit__ | ||||
|             #         #   calling we may not activate the | ||||
|             #         #   ctxc-handler block below? should we | ||||
|             #         #   be? | ||||
|             #         # - if there's a remote error that arrives | ||||
|             #         #   after the child has exited, we won't | ||||
|             #         #   handle until the `finally:` block | ||||
|             #         #   where `.result()` is always called, | ||||
|             #         #   again in which case we handle it | ||||
|             #         #   differently then in the handler block | ||||
|             #         #   that would normally engage from THIS | ||||
|             #         #   block? | ||||
|             #         raise_ctxc_from_self_call=True, | ||||
|             #     ) | ||||
|             #     assert maybe_ctxc | ||||
| 
 | ||||
|             if ctx.chan.connected(): | ||||
|                 log.info( | ||||
|                     'Waiting on final context-task result for\n' | ||||
|  | @ -644,13 +736,8 @@ class Portal: | |||
|                 # As per `Context._deliver_msg()`, that error IS | ||||
|                 # ALWAYS SET any time "callee" side fails and causes "caller | ||||
|                 # side" cancellation via a `ContextCancelled` here. | ||||
|                 # result = await ctx.result() | ||||
|                 try: | ||||
|                     result = await ctx.result() | ||||
|                     log.runtime( | ||||
|                         f'Context {fn_name} returned value from callee:\n' | ||||
|                         f'`{result}`' | ||||
|                     ) | ||||
|                     result_or_err: Exception|Any = await ctx.result() | ||||
|                 except BaseException as berr: | ||||
|                     # on normal teardown, if we get some error | ||||
|                     # raised in `Context.result()` we still want to | ||||
|  | @ -662,7 +749,48 @@ class Portal: | |||
|                     scope_err = berr | ||||
|                     raise | ||||
| 
 | ||||
|                 # an exception type boxed in a `RemoteActorError` | ||||
|                 # is returned (meaning it was obvi not raised). | ||||
|                 msgdata: str|None = getattr( | ||||
|                     result_or_err, | ||||
|                     'msgdata', | ||||
|                     None | ||||
|                 ) | ||||
|                 # yes! this worx Bp | ||||
|                 # from .devx import _debug | ||||
|                 # await _debug.pause() | ||||
|                 match (msgdata, result_or_err): | ||||
|                     case ( | ||||
|                         {'tb_str': tbstr}, | ||||
|                         ContextCancelled(), | ||||
|                     ): | ||||
|                         log.cancel(tbstr) | ||||
| 
 | ||||
|                     case ( | ||||
|                         {'tb_str': tbstr}, | ||||
|                         RemoteActorError(), | ||||
|                     ): | ||||
|                         log.exception( | ||||
|                             f'Context `{fn_name}` remotely errored:\n' | ||||
|                             f'`{tbstr}`' | ||||
|                         ) | ||||
|                     case (None, _): | ||||
|                         log.runtime( | ||||
|                             f'Context {fn_name} returned value from callee:\n' | ||||
|                             f'`{result_or_err}`' | ||||
|                         ) | ||||
| 
 | ||||
|         finally: | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|             # ``Actor._push_result()`` the msg will be discarded and in | ||||
|             # the case where that msg is global debugger unlock (via | ||||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
|             await maybe_wait_for_debugger() | ||||
| 
 | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|             # we tear down the runtime feeder chan last | ||||
|  | @ -707,6 +835,10 @@ class Portal: | |||
|                 # out any exception group or legit (remote) ctx | ||||
|                 # error that sourced from the remote task or its | ||||
|                 # runtime. | ||||
|                 # | ||||
|                 # NOTE: further, this should be the only place the | ||||
|                 # underlying feeder channel is | ||||
|                 # once-and-only-CLOSED! | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|  | @ -747,6 +879,9 @@ class Portal: | |||
| 
 | ||||
|             # FINALLY, remove the context from runtime tracking and | ||||
|             # exit! | ||||
|             log.runtime( | ||||
|                 f'Exiting context opened with {ctx.chan.uid}' | ||||
|             ) | ||||
|             self.actor._contexts.pop( | ||||
|                 (self.channel.uid, ctx.cid), | ||||
|                 None, | ||||
|  |  | |||
|  | @ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()`` | |||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| import inspect | ||||
| from contextlib import asynccontextmanager as acm | ||||
| import inspect | ||||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|  | @ -35,6 +36,7 @@ import trio | |||
| 
 | ||||
| from ._exceptions import ( | ||||
|     _raise_from_no_key_in_msg, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .trionics import ( | ||||
|  | @ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel): | |||
|         self._broadcaster = _broadcaster | ||||
| 
 | ||||
|         # flag to denote end of stream | ||||
|         self._eoc: bool = False | ||||
|         self._closed: bool = False | ||||
|         self._eoc: bool|trio.EndOfChannel = False | ||||
|         self._closed: bool|trio.ClosedResourceError = False | ||||
| 
 | ||||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|  | @ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel): | |||
|         try: | ||||
|             return msg['yield'] | ||||
|         except KeyError as kerr: | ||||
|             # if 'return' in msg: | ||||
|             #     return msg | ||||
| 
 | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=self._ctx, | ||||
|                 msg=msg, | ||||
|  | @ -122,30 +127,43 @@ class MsgStream(trio.abc.Channel): | |||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|         # introducing this | ||||
|         if self._eoc: | ||||
|             raise trio.EndOfChannel | ||||
|             raise self._eoc | ||||
|             # raise trio.EndOfChannel | ||||
| 
 | ||||
|         if self._closed: | ||||
|             raise trio.ClosedResourceError('This stream was closed') | ||||
|             raise self._closed | ||||
|             # raise trio.ClosedResourceError( | ||||
|             #     'This stream was already closed' | ||||
|             # ) | ||||
| 
 | ||||
|         src_err: Exception|None = None | ||||
|         try: | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
|             try: | ||||
|                 msg = await self._rx_chan.receive() | ||||
|                 return msg['yield'] | ||||
| 
 | ||||
|         except KeyError as kerr: | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=self._ctx, | ||||
|                 msg=msg, | ||||
|                 src_err=kerr, | ||||
|                 log=log, | ||||
|                 expect_key='yield', | ||||
|                 stream=self, | ||||
|             ) | ||||
|             except KeyError as kerr: | ||||
|                 src_err = kerr | ||||
| 
 | ||||
|                 # NOTE: may raise any of the below error types | ||||
|                 # includg EoC when a 'stop' msg is found. | ||||
|                 _raise_from_no_key_in_msg( | ||||
|                     ctx=self._ctx, | ||||
|                     msg=msg, | ||||
|                     src_err=kerr, | ||||
|                     log=log, | ||||
|                     expect_key='yield', | ||||
|                     stream=self, | ||||
|                 ) | ||||
| 
 | ||||
|         # XXX: we close the stream on any of these error conditions: | ||||
|         except ( | ||||
|             trio.ClosedResourceError,  # by self._rx_chan | ||||
|             # trio.ClosedResourceError,  # by self._rx_chan | ||||
|             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end | ||||
|         ): | ||||
|             # XXX: we close the stream on any of these error conditions: | ||||
|         ) as eoc: | ||||
|             src_err = eoc | ||||
|             self._eoc = eoc | ||||
|             # await trio.sleep(1) | ||||
| 
 | ||||
|             # a ``ClosedResourceError`` indicates that the internal | ||||
|             # feeder memory receive channel was closed likely by the | ||||
|  | @ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel): | |||
|             # closing this stream and not flushing a final value to | ||||
|             # remaining (clone) consumers who may not have been | ||||
|             # scheduled to receive it yet. | ||||
|             # try: | ||||
|             #     maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait() | ||||
|             #     if maybe_err_msg_or_res: | ||||
|             #         log.warning( | ||||
|             #             'Discarding un-processed msg:\n' | ||||
|             #             f'{maybe_err_msg_or_res}' | ||||
|             #         ) | ||||
|             # except trio.WouldBlock: | ||||
|             #     # no queued msgs that might be another remote | ||||
|             #     # error, so just raise the original EoC | ||||
|             #     pass | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|             await self.aclose() | ||||
|             # raise eoc | ||||
| 
 | ||||
|             raise  # propagate | ||||
|         except trio.ClosedResourceError as cre:  # by self._rx_chan | ||||
|             src_err = cre | ||||
|             log.warning( | ||||
|                 '`Context._rx_chan` was already closed?' | ||||
|             ) | ||||
|             self._closed = cre | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         # when the send is closed we assume the stream has | ||||
|         # terminated and signal this local iterator to stop | ||||
|         drained: list[Exception|dict] = await self.aclose() | ||||
|         if drained: | ||||
|             log.warning( | ||||
|                 'Drained context msgs during closure:\n' | ||||
|                 f'{drained}' | ||||
|             ) | ||||
|         # TODO: pass these to the `._ctx._drained_msgs: deque` | ||||
|         # and then iterate them as part of any `.result()` call? | ||||
| 
 | ||||
|         # NOTE XXX: if the context was cancelled or remote-errored | ||||
|         # but we received the stream close msg first, we | ||||
|         # probably want to instead raise the remote error | ||||
|         # over the end-of-stream connection error since likely | ||||
|         # the remote error was the source cause? | ||||
|         ctx: Context = self._ctx | ||||
|         if re := ctx._remote_error: | ||||
|             ctx._maybe_raise_remote_err( | ||||
|                 re, | ||||
|                 raise_ctxc_from_self_call=True, | ||||
|             ) | ||||
| 
 | ||||
|         raise src_err  # propagate | ||||
| 
 | ||||
|     async def aclose(self) -> list[Exception|dict]: | ||||
|         ''' | ||||
|         Cancel associated remote actor task and local memory channel on | ||||
|         close. | ||||
|  | @ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel): | |||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|             log.cancel(f"{self} is already closed") | ||||
|         if ( | ||||
|             rx_chan._closed | ||||
|             or | ||||
|             self._closed | ||||
|         ): | ||||
|             log.cancel( | ||||
|                 f'`MsgStream` is already closed\n' | ||||
|                 f'.cid: {self._ctx.cid}\n' | ||||
|                 f'._rx_chan`: {rx_chan}\n' | ||||
|                 f'._eoc: {self._eoc}\n' | ||||
|                 f'._closed: {self._eoc}\n' | ||||
|             ) | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|             # per ``trio.AsyncResource`` semantics. | ||||
|             # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|             return | ||||
|             return [] | ||||
| 
 | ||||
|         self._eoc = True | ||||
|         ctx: Context = self._ctx | ||||
|         # caught_eoc: bool = False | ||||
|         drained: list[Exception|dict] = [] | ||||
|         while not drained: | ||||
|             try: | ||||
|                 maybe_final_msg = self.receive_nowait() | ||||
|                 if maybe_final_msg: | ||||
|                     log.cancel( | ||||
|                         'Drained un-processed stream msg:\n' | ||||
|                         f'{pformat(maybe_final_msg)}' | ||||
|                     ) | ||||
|                     # TODO: inject into parent `Context` buf? | ||||
|                     drained.append(maybe_final_msg) | ||||
| 
 | ||||
|             except trio.WouldBlock as be: | ||||
|                 drained.append(be) | ||||
|                 break | ||||
| 
 | ||||
|             except trio.EndOfChannel as eoc: | ||||
|                 drained.append(eoc) | ||||
|                 # caught_eoc = True | ||||
|                 self._eoc: bool = eoc | ||||
|                 break | ||||
| 
 | ||||
|             except ContextCancelled as ctxc: | ||||
|                 log.cancel( | ||||
|                     'Context was cancelled during stream closure:\n' | ||||
|                     f'canceller: {ctxc.canceller}\n' | ||||
|                     f'{pformat(ctxc.msgdata)}' | ||||
|                 ) | ||||
|                 break | ||||
| 
 | ||||
|         # NOTE: this is super subtle IPC messaging stuff: | ||||
|         # Relay stop iteration to far end **iff** we're | ||||
|  | @ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel): | |||
|         except ( | ||||
|             trio.BrokenResourceError, | ||||
|             trio.ClosedResourceError | ||||
|         ): | ||||
|         ) as re: | ||||
|             # the underlying channel may already have been pulled | ||||
|             # in which case our stop message is meaningless since | ||||
|             # it can't traverse the transport. | ||||
|             ctx = self._ctx | ||||
|             log.warning( | ||||
|                 f'Stream was already destroyed?\n' | ||||
|                 f'actor: {ctx.chan.uid}\n' | ||||
|                 f'ctx id: {ctx.cid}' | ||||
|             ) | ||||
|             drained.append(re) | ||||
|             self._closed = re | ||||
| 
 | ||||
|         self._closed = True | ||||
|         # if caught_eoc: | ||||
|         #     # from .devx import _debug | ||||
|         #     # await _debug.pause() | ||||
|         #     with trio.CancelScope(shield=True): | ||||
|         #         await rx_chan.aclose() | ||||
| 
 | ||||
|         # Do we close the local mem chan ``self._rx_chan`` ??!? | ||||
|         # self._eoc: bool = caught_eoc | ||||
| 
 | ||||
|         # NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``! | ||||
|         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||
|         # the potential final result from the surrounding inter-actor | ||||
|         # `Context` so we don't want to close it until that context has | ||||
|         # run to completion. | ||||
|         # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? | ||||
|         # => NO, DEFINITELY NOT! <= | ||||
|         # if we're a bi-dir ``MsgStream`` BECAUSE this same | ||||
|         # core-msg-loop mem recv-chan is used to deliver the | ||||
|         # potential final result from the surrounding inter-actor | ||||
|         # `Context` so we don't want to close it until that | ||||
|         # context has run to completion. | ||||
| 
 | ||||
|         # XXX: Notes on old behaviour: | ||||
|         # await rx_chan.aclose() | ||||
|  | @ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel): | |||
|         # runtime's closure of ``rx_chan`` in the case where we may | ||||
|         # still need to consume msgs that are "in transit" from the far | ||||
|         # end (eg. for ``Context.result()``). | ||||
|         # self._closed = True | ||||
|         return drained | ||||
| 
 | ||||
|     @acm | ||||
|     async def subscribe( | ||||
|  | @ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel): | |||
|             raise self._ctx._remote_error  # from None | ||||
| 
 | ||||
|         if self._closed: | ||||
|             raise trio.ClosedResourceError('This stream was already closed') | ||||
|             raise self._closed | ||||
|             # raise trio.ClosedResourceError('This stream was already closed') | ||||
| 
 | ||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
|         await self._ctx.chan.send({ | ||||
|             'yield': data, | ||||
|             'cid': self._ctx.cid, | ||||
|         }) | ||||
| 
 | ||||
| 
 | ||||
| def stream(func: Callable) -> Callable: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue