From ad5eee5666eedddab1c55916162f9a83c552ca0f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 22 Feb 2024 18:33:18 -0500 Subject: [PATCH] WIP final impl of ctx-cancellation-semantics --- tractor/_context.py | 280 ++++++++++++++++++++++++++++++------------ tractor/_portal.py | 184 +++++++++++++++++---------- tractor/_streaming.py | 86 ++++++++----- 3 files changed, 378 insertions(+), 172 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 54e309e..ee05a2b 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -47,6 +47,7 @@ import trio # maybe_wait_for_debugger, # pause, # ) +from .msg import NamespacePath from ._exceptions import ( # _raise_from_no_key_in_msg, unpack_error, @@ -71,12 +72,23 @@ log = get_logger(__name__) async def _drain_to_final_msg( ctx: Context, -) -> list[dict]: -# ) -> tuple[ -# Any|Exception, -# list[dict], -# ]: + msg_limit: int = 6, + +) -> list[dict]: + ''' + Drain IPC msgs delivered to the underlying rx-mem-chan + `Context._recv_chan` from the runtime in search for a final + result or error msg. + + The motivation here is to ideally capture errors during ctxc + conditions where a canc-request/or local error is sent but the + local task also excepts and enters the + `Portal.open_context().__aexit__()` block wherein we prefer to + capture and raise any remote error or ctxc-ack as part of the + `ctx.result()` cleanup and teardown sequence. + + ''' raise_overrun: bool = not ctx._allow_overruns # wait for a final context result by collecting (but @@ -88,14 +100,14 @@ async def _drain_to_final_msg( # NOTE: this REPL usage actually works here dawg! Bo # from .devx._debug import pause # await pause() - # if re := ctx._remote_error: - # ctx._maybe_raise_remote_err( - # re, - # # NOTE: obvi we don't care if we - # # overran the far end if we're already - # # waiting on a final result (msg). - # raise_overrun_from_self=raise_overrun, - # ) + if re := ctx._remote_error: + ctx._maybe_raise_remote_err( + re, + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). + raise_overrun_from_self=raise_overrun, + ) # TODO: bad idea? # with trio.CancelScope() as res_cs: @@ -108,7 +120,7 @@ async def _drain_to_final_msg( msg: dict = await ctx._recv_chan.receive() ctx._result: Any = msg['return'] log.runtime( - 'Context delivered final result msg:\n' + 'Context delivered final draining msg:\n' f'{pformat(msg)}' ) pre_result_drained.append(msg) @@ -142,9 +154,47 @@ async def _drain_to_final_msg( if 'yield' in msg: # far end task is still streaming to us so discard - log.warning(f'Discarding std "yield"\n{msg}') - pre_result_drained.append(msg) - continue + # and report per local context state. + if ( + (ctx._stream.closed + and (reason := 'stream was already closed') + ) + or (ctx._cancel_called + and (reason := 'ctx called `.cancel()`') + ) + or (ctx._cancelled_caught + and (reason := 'ctx caught a cancel') + ) + or (len(pre_result_drained) > msg_limit + and (reason := f'"yield" limit={msg_limit}') + ) + ): + log.cancel( + 'Cancelling `MsgStream` drain since ' + f'{reason}\n\n' + f'<= {ctx.chan.uid}\n' + f' |_{ctx._nsf}()\n\n' + f'=> {ctx._task}\n' + f' |_{ctx._stream}\n\n' + + f'{pformat(msg)}\n' + ) + return pre_result_drained + + # drain up to the `msg_limit` hoping to get + # a final result or error/ctxc. + else: + log.warning( + 'Ignoring "yield" msg during `ctx.result()` drain..\n' + f'<= {ctx.chan.uid}\n' + f' |_{ctx._nsf}()\n\n' + f'=> {ctx._task}\n' + f' |_{ctx._stream}\n\n' + + f'{pformat(msg)}\n' + ) + pre_result_drained.append(msg) + continue # TODO: work out edge cases here where # a stream is open but the task also calls @@ -153,8 +203,8 @@ async def _drain_to_final_msg( # right? elif 'stop' in msg: log.cancel( - 'Remote stream terminated due to "stop" msg:\n' - f'{msg}' + 'Remote stream terminated due to "stop" msg:\n\n' + f'{pformat(msg)}\n' ) pre_result_drained.append(msg) continue @@ -260,12 +310,14 @@ class Context: ''' chan: Channel cid: str # "context id", more or less a unique linked-task-pair id - # the "feeder" channels for delivering message values to the # local task from the runtime's msg processing loop. _recv_chan: trio.MemoryReceiveChannel _send_chan: trio.MemorySendChannel + # full "namespace-path" to target RPC function + _nsf: NamespacePath + # the "invocation type" of the far end task-entry-point # function, normally matching a logic block inside # `._runtime.invoke()`. @@ -281,6 +333,7 @@ class Context: # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. _scope: trio.CancelScope | None = None + _task: trio.lowlevel.Task|None = None # _res_scope: trio.CancelScope|None = None # on a clean exit there should be a final value @@ -384,6 +437,7 @@ class Context: # init and streaming state _started_called: bool = False _stream_opened: bool = False + _stream: MsgStream|None = None # overrun handling machinery # NOTE: none of this provides "backpressure" to the remote @@ -577,13 +631,14 @@ class Context: ''' side: str = self.side - log.cancel( - f'Cancelling {side} side of context to {self.chan.uid}' - ) - - # await pause() self._cancel_called: bool = True + header: str = f'Cancelling "{side.upper()}"-side of ctx with peer\n' + reminfo: str = ( + f'uid: {self.chan.uid}\n' + f' |_ {self._nsf}()\n' + ) + # caller side who entered `Portal.open_context()` # NOTE: on the call side we never manually call # `._scope.cancel()` since we expect the eventual @@ -601,8 +656,9 @@ class Context: with trio.move_on_after(timeout) as cs: cs.shield = True log.cancel( - f'Cancelling stream {cid} to ' - f'{self._portal.channel.uid}' + header + + + reminfo ) # NOTE: we're telling the far end actor to cancel a task @@ -621,13 +677,13 @@ class Context: # if not self._portal.channel.connected(): if not self.chan.connected(): log.cancel( - 'May have failed to cancel remote task ' - f'{cid} for {self._portal.channel.uid}' + 'May have failed to cancel remote task?\n' + f'{reminfo}' ) else: log.cancel( - 'Timed out on cancel request of remote task ' - f'{cid} for {self._portal.channel.uid}' + 'Timed out on cancel request of remote task?\n' + f'{reminfo}' ) # callee side remote task @@ -635,6 +691,11 @@ class Context: # the caller expects a `ContextCancelled` to be sent from # `._runtime._invoke()` back to the other side. else: + log.cancel( + header + + + reminfo + ) # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? @@ -720,8 +781,9 @@ class Context: # single-direction-stream case you'll get a lookup error # currently. ctx: Context = actor.get_context( - self.chan, - self.cid, + chan=self.chan, + cid=self.cid, + nsf=self._nsf, msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) @@ -735,7 +797,7 @@ class Context: if ctx._recv_chan._closed: raise trio.ClosedResourceError( - 'The underlying channel for this stream was already closed!?' + 'The underlying channel for this stream was already closed!\n' ) # NOTE: implicitly this will call `MsgStream.aclose()` on @@ -764,6 +826,7 @@ class Context: try: self._stream_opened: bool = True + self._stream = stream # XXX: do we need this? # ensure we aren't cancelled before yielding the stream @@ -1174,35 +1237,47 @@ class Context: self, msg: dict, - # draining: bool = False, - ) -> bool: ''' Deliver an IPC msg received from a transport-channel to - this context's underlying mem chan for handling by - user operating tasks; deliver a bool indicating whether the - msg was immediately sent. + this context's underlying mem chan for handling by local + user application tasks; deliver `bool` indicating whether + the msg was able to be delivered. If `._allow_overruns == True` (maybe) append the msg to an "overflow queue" and start a "drainer task" (inside the `._scope_nursery: trio.Nursery`) which ensures that such - messages are eventually sent if possible. + messages are queued up and eventually sent if possible. ''' cid: str = self.cid chan: Channel = self.chan from_uid: tuple[str, str] = chan.uid send_chan: trio.MemorySendChannel = self._send_chan + nsf: NamespacePath = self._nsf + re: Exception|None if re := unpack_error( msg, self.chan, ): log.error( - f'Delivering error-msg from {from_uid} to caller {cid}' - f'{re}' + f'Delivering error-msg to caller\n' + f'<= peer: {from_uid}\n' + f' |_ {nsf}()\n\n' + + f'=> cid: {cid}\n' + f' |_{self._task}\n\n' + + f'{pformat(re)}\n' ) - self._cancel_msg = msg + self._cancel_msg: dict = msg + + # NOTE: this will not raise an error, merely set + # `._remote_error` and maybe cancel any task currently + # entered in `Portal.open_context()` presuming the + # error is "cancel causing" (i.e. `ContextCancelled` + # or `RemoteActorError`). self._maybe_cancel_and_set_remote_error(re) # XXX NEVER do this XXX..!! @@ -1218,26 +1293,44 @@ class Context: if self._in_overrun: log.warning( - f'Capturing overrun-msg from {from_uid} to caller {cid}' - f'{msg}' + f'Queueing OVERRUN msg on caller task:\n' + f'<= peer: {from_uid}\n' + f' |_ {nsf}()\n\n' + + f'=> cid: {cid}\n' + f' |_{self._task}\n\n' + + f'{pformat(msg)}\n' ) self._overflow_q.append(msg) return False try: log.runtime( - f'Delivering IPC `Context` msg:\n' + f'Delivering msg from IPC ctx:\n' f'<= {from_uid}\n' - f'=> caller: {cid}\n' - f'{msg}' + f' |_ {nsf}()\n\n' + + f'=> {self._task}\n' + f' |_cid={self.cid}\n\n' + + f'{pformat(msg)}\n' ) # from .devx._debug import pause # await pause() + + # NOTE: if an error is deteced we should always still + # send it through the feeder-mem-chan and expect + # it to be raised by any context (stream) consumer + # task via the consumer APIs on both the `Context` and + # `MsgStream`! + # + # XXX the reason is that this method is always called + # by the IPC msg handling runtime task and that is not + # normally the task that should get cancelled/error + # from some remote fault! send_chan.send_nowait(msg) return True - # if an error is deteced we should always - # expect it to be raised by any context (stream) - # consumer task except trio.BrokenResourceError: # TODO: what is the right way to handle the case where the @@ -1248,7 +1341,13 @@ class Context: # XXX: local consumer has closed their side # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") + log.warning( + 'Rx chan for `Context` alfready closed?\n' + f'cid: {self.cid}\n' + 'Failed to deliver msg:\n' + f'send_chan: {send_chan}\n\n' + f'{pformat(msg)}\n' + ) return False # NOTE XXX: by default we do **not** maintain context-stream @@ -1257,44 +1356,54 @@ class Context: # msg handling loop which calls into this method! except trio.WouldBlock: - # XXX: always push an error even if the local - # receiver is in overrun state. - # self._maybe_cancel_and_set_remote_error(msg) + # XXX: always push an error even if the local receiver + # is in overrun state - i.e. if an 'error' msg is + # delivered then + # `._maybe_cancel_and_set_remote_error(msg)` should + # have already been called above! + # + # XXX QUESTION XXX: if we rx an error while in an + # overrun state and that msg isn't stuck in an + # overflow queue what happens?!? local_uid = current_actor().uid - lines = [ - f'OVERRUN on actor-task context {cid}@{local_uid}!\n' - # TODO: put remote task name here if possible? - f'sender: {from_uid}', - f'msg: {msg}', - # TODO: put task func name here and maybe an arrow - # from sender to overrunner? - # f'local task {self.func_name}' - ] - if not self._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{local_uid[0]}` side! ***\n' - ) + txt: str = ( + 'on IPC context:\n' - text = '\n'.join(lines) + f'<= sender: {from_uid}\n' + f' |_ {self._nsf}()\n\n' + + f'=> overrun: {local_uid}\n' + f' |_cid: {cid}\n' + f' |_task: {self._task}\n' + ) + if not self._stream_opened: + txt += ( + f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n' + f'{msg}\n' + ) # XXX: lul, this really can't be backpressure since any # blocking here will block the entire msg loop rpc sched for # a whole channel.. maybe we should rename it? if self._allow_overruns: - text += f'\nStarting overflow queuing task on msg: {msg}' - log.warning(text) + txt += ( + '\n*** Starting overflow queuing task on msg ***\n\n' + f'{msg}\n' + ) + log.warning(txt) if ( not self._in_overrun ): self._overflow_q.append(msg) - n = self._scope_nursery - assert not n.child_tasks + tn: trio.Nursery = self._scope_nursery + assert not tn.child_tasks try: - n.start_soon( + tn.start_soon( self._drain_overflows, ) + return True + except RuntimeError: # if the nursery is already cancelled due to # this context exiting or in error, we ignore @@ -1302,11 +1411,12 @@ class Context: # anything different. return False else: + txt += f'\n{msg}\n' # raise local overrun and immediately pack as IPC # msg for far end. try: raise StreamOverrun( - text, + txt, sender=from_uid, ) except StreamOverrun as err: @@ -1314,20 +1424,28 @@ class Context: err, cid=cid, ) - # err_msg['cid']: str = cid try: + # relay condition to sender side remote task await chan.send(err_msg) + return True + except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task - log.warning(f"{chan} is already closed") + log.warning( + 'Channel for ctx is already closed?\n' + f'|_{chan}\n' + ) + # ow, indicate unable to deliver by default return False def mk_context( chan: Channel, cid: str, + nsf: NamespacePath, + msg_buffer_size: int = 2**6, **kwargs, @@ -1345,10 +1463,12 @@ def mk_context( send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) ctx = Context( - chan, - cid, + chan=chan, + cid=cid, _send_chan=send_chan, _recv_chan=recv_chan, + _nsf=nsf, + _task=trio.lowlevel.current_task(), **kwargs, ) ctx._result: int | Any = id(ctx) diff --git a/tractor/_portal.py b/tractor/_portal.py index 14f6fbf..a4f2f61 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -69,18 +69,35 @@ from ._streaming import ( log = get_logger(__name__) +# TODO: rename to `unwrap_result()` and use +# `._raise_from_no_key_in_msg()` (after tweak to +# accept a `chan: Channel` arg) in key block! def _unwrap_msg( msg: dict[str, Any], - channel: Channel + channel: Channel, + + hide_tb: bool = True, ) -> Any: - __tracebackhide__ = True + ''' + Unwrap a final result from a `{return: }` IPC msg. + + ''' + __tracebackhide__: bool = hide_tb + try: return msg['return'] except KeyError as ke: + # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, channel) from ke + assert msg.get('cid'), ( + "Received internal error at portal?" + ) + + raise unpack_error( + msg, + channel + ) from ke class Portal: @@ -107,7 +124,7 @@ class Portal: cancel_timeout: float = 0.5 def __init__(self, channel: Channel) -> None: - self.channel = channel + self.chan = channel # during the portal's lifetime self._result_msg: Optional[dict] = None @@ -118,6 +135,18 @@ class Portal: self._streams: set[MsgStream] = set() self.actor = current_actor() + @property + def channel(self) -> Channel: + ''' + Proxy to legacy attr name.. + + Consider the shorter `Portal.chan` instead of `.channel` ;) + ''' + log.debug( + 'Consider the shorter `Portal.chan` instead of `.channel` ;)' + ) + return self.chan + async def _submit_for_result( self, ns: str, @@ -125,14 +154,14 @@ class Portal: **kwargs ) -> None: - assert self._expect_result is None, \ - "A pending main result has already been submitted" + assert self._expect_result is None, ( + "A pending main result has already been submitted" + ) self._expect_result = await self.actor.start_remote_task( self.channel, - ns, - func, - kwargs + nsf=NamespacePath(f'{ns}:{func}'), + kwargs=kwargs ) async def _return_once( @@ -173,7 +202,10 @@ class Portal: self._expect_result ) - return _unwrap_msg(self._result_msg, self.channel) + return _unwrap_msg( + self._result_msg, + self.channel, + ) async def _cancel_streams(self): # terminate all locally running async generator @@ -215,26 +247,33 @@ class Portal: purpose. ''' - if not self.channel.connected(): - log.cancel("This channel is already closed can't cancel") + chan: Channel = self.channel + if not chan.connected(): + log.runtime( + 'This channel is already closed, skipping cancel request..' + ) return False + reminfo: str = ( + f'uid: {self.channel.uid}\n' + f' |_{chan}\n' + ) log.cancel( - f"Sending actor cancel request to {self.channel.uid} on " - f"{self.channel}") - - self.channel._cancel_called = True + f'Sending actor cancel request to peer\n' + f'{reminfo}' + ) + self.channel._cancel_called: bool = True try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with # a proper shield with trio.move_on_after( timeout - or self.cancel_timeout + or + self.cancel_timeout ) as cs: - cs.shield = True - + cs.shield: bool = True await self.run_from_ns( 'self', 'cancel', @@ -242,7 +281,10 @@ class Portal: return True if cs.cancelled_caught: - log.cancel(f"May have failed to cancel {self.channel.uid}") + log.cancel( + 'May have failed to cancel peer?\n' + f'{reminfo}' + ) # if we get here some weird cancellation case happened return False @@ -272,27 +314,33 @@ class Portal: Note:: - A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this - should only be used solely for ``tractor`` runtime - internals. + A special namespace `self` can be used to invoke `Actor` + instance methods in the remote runtime. Currently this + should only ever be used for `Actor` (method) runtime + internals! ''' + nsf = NamespacePath( + f'{namespace_path}:{function_name}' + ) ctx = await self.actor.start_remote_task( - self.channel, - namespace_path, - function_name, - kwargs, + chan=self.channel, + nsf=nsf, + kwargs=kwargs, ) ctx._portal = self msg = await self._return_once(ctx) - return _unwrap_msg(msg, self.channel) + return _unwrap_msg( + msg, + self.channel, + ) async def run( self, func: str, - fn_name: Optional[str] = None, + fn_name: str|None = None, **kwargs + ) -> Any: ''' Submit a remote function to be scheduled and run by actor, in @@ -311,8 +359,9 @@ class Portal: DeprecationWarning, stacklevel=2, ) - fn_mod_path = func + fn_mod_path: str = func assert isinstance(fn_name, str) + nsf = NamespacePath(f'{fn_mod_path}:{fn_name}') else: # function reference was passed directly if ( @@ -325,13 +374,12 @@ class Portal: raise TypeError( f'{func} must be a non-streaming async function!') - fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() + nsf = NamespacePath.from_ref(func) ctx = await self.actor.start_remote_task( self.channel, - fn_mod_path, - fn_name, - kwargs, + nsf=nsf, + kwargs=kwargs, ) ctx._portal = self return _unwrap_msg( @@ -355,15 +403,10 @@ class Portal: raise TypeError( f'{async_gen_func} must be an async generator function!') - fn_mod_path, fn_name = NamespacePath.from_ref( - async_gen_func - ).to_tuple() - - ctx = await self.actor.start_remote_task( + ctx: Context = await self.actor.start_remote_task( self.channel, - fn_mod_path, - fn_name, - kwargs + nsf=NamespacePath.from_ref(async_gen_func), + kwargs=kwargs, ) ctx._portal = self @@ -405,7 +448,10 @@ class Portal: self, func: Callable, + allow_overruns: bool = False, + + # proxied to RPC **kwargs, ) -> AsyncGenerator[tuple[Context, Any], None]: @@ -448,13 +494,12 @@ class Portal: # TODO: i think from here onward should probably # just be factored into an `@acm` inside a new # a new `_context.py` mod. - fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() + nsf = NamespacePath.from_ref(func) - ctx = await self.actor.start_remote_task( + ctx: Context = await self.actor.start_remote_task( self.channel, - fn_mod_path, - fn_name, - kwargs, + nsf=nsf, + kwargs=kwargs, # NOTE: it's imporant to expose this since you might # get the case where the parent who opened the context does @@ -721,10 +766,10 @@ class Portal: # assert maybe_ctxc if ctx.chan.connected(): - log.info( - 'Waiting on final context-task result for\n' - f'task: {cid}\n' - f'actor: {uid}' + log.runtime( + 'Waiting on final context result for\n' + f'peer: {uid}\n' + f'|_{ctx._task}\n' ) # XXX NOTE XXX: the below call to # `Context.result()` will ALWAYS raise @@ -771,13 +816,19 @@ class Portal: RemoteActorError(), ): log.exception( - f'Context `{fn_name}` remotely errored:\n' - f'`{tbstr}`' + 'Context remotely errored!\n' + f'<= peer: {uid}\n' + f' |_ {nsf}()\n\n' + + f'{tbstr}' ) case (None, _): log.runtime( - f'Context {fn_name} returned value from callee:\n' - f'`{result_or_err}`' + 'Context returned final result from callee task:\n' + f'<= peer: {uid}\n' + f' |_ {nsf}()\n\n' + + f'`{result_or_err}`\n' ) finally: @@ -855,26 +906,31 @@ class Portal: # CASE 2 if ctx._cancel_called: log.cancel( - f'Context {fn_name} cancelled by caller with\n' + 'Context cancelled by caller task\n' + f'|_{ctx._task}\n\n' + f'{etype}' ) # CASE 1 else: log.cancel( - f'Context cancelled by callee with {etype}\n' - f'target: `{fn_name}`\n' - f'task:{cid}\n' - f'actor:{uid}' + f'Context cancelled by remote callee task\n' + f'peer: {uid}\n' + f'|_ {nsf}()\n\n' + + f'{etype}\n' ) # FINALLY, remove the context from runtime tracking and # exit! log.runtime( - f'Exiting context opened with {ctx.chan.uid}' + 'Removing IPC ctx opened with peer\n' + f'{uid}\n' + f'|_{ctx}\n' ) self.actor._contexts.pop( - (self.channel.uid, ctx.cid), + (uid, cid), None, ) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index e8f735e..64b5dd6 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -95,9 +95,6 @@ class MsgStream(trio.abc.Channel): try: return msg['yield'] except KeyError as kerr: - # if 'return' in msg: - # return msg - _raise_from_no_key_in_msg( ctx=self._ctx, msg=msg, @@ -128,13 +125,9 @@ class MsgStream(trio.abc.Channel): # introducing this if self._eoc: raise self._eoc - # raise trio.EndOfChannel if self._closed: raise self._closed - # raise trio.ClosedResourceError( - # 'This stream was already closed' - # ) src_err: Exception|None = None try: @@ -143,6 +136,7 @@ class MsgStream(trio.abc.Channel): return msg['yield'] except KeyError as kerr: + # log.exception('GOT KEYERROR') src_err = kerr # NOTE: may raise any of the below error types @@ -161,9 +155,9 @@ class MsgStream(trio.abc.Channel): # trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end ) as eoc: + # log.exception('GOT EOC') src_err = eoc self._eoc = eoc - # await trio.sleep(1) # a ``ClosedResourceError`` indicates that the internal # feeder memory receive channel was closed likely by the @@ -201,6 +195,7 @@ class MsgStream(trio.abc.Channel): # raise eoc except trio.ClosedResourceError as cre: # by self._rx_chan + # log.exception('GOT CRE') src_err = cre log.warning( '`Context._rx_chan` was already closed?' @@ -211,6 +206,8 @@ class MsgStream(trio.abc.Channel): # terminated and signal this local iterator to stop drained: list[Exception|dict] = await self.aclose() if drained: + # from .devx import pause + # await pause() log.warning( 'Drained context msgs during closure:\n' f'{drained}' @@ -237,31 +234,32 @@ class MsgStream(trio.abc.Channel): Cancel associated remote actor task and local memory channel on close. + Notes: + - REMEMBER that this is also called by `.__aexit__()` so + careful consideration must be made to handle whatever + internal stsate is mutated, particuarly in terms of + draining IPC msgs! + + - more or less we try to maintain adherance to trio's `.aclose()` semantics: + https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose ''' - # XXX: keep proper adherance to trio's `.aclose()` semantics: - # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose - rx_chan = self._rx_chan - if ( - rx_chan._closed - or - self._closed - ): - log.cancel( - f'`MsgStream` is already closed\n' - f'.cid: {self._ctx.cid}\n' - f'._rx_chan`: {rx_chan}\n' - f'._eoc: {self._eoc}\n' - f'._closed: {self._eoc}\n' - ) + # rx_chan = self._rx_chan + # XXX NOTE XXX + # it's SUPER IMPORTANT that we ensure we don't DOUBLE + # DRAIN msgs on closure so avoid getting stuck handing on + # the `._rx_chan` since we call this method on + # `.__aexit__()` as well!!! + # => SO ENSURE WE CATCH ALL TERMINATION STATES in this + # block including the EoC.. + if self.closed: # this stream has already been closed so silently succeed as # per ``trio.AsyncResource`` semantics. # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return [] ctx: Context = self._ctx - # caught_eoc: bool = False drained: list[Exception|dict] = [] while not drained: try: @@ -274,17 +272,26 @@ class MsgStream(trio.abc.Channel): # TODO: inject into parent `Context` buf? drained.append(maybe_final_msg) + # NOTE: we only need these handlers due to the + # `.receive_nowait()` call above which may re-raise + # one of these errors on a msg key error! + except trio.WouldBlock as be: drained.append(be) break except trio.EndOfChannel as eoc: + self._eoc: Exception = eoc drained.append(eoc) - # caught_eoc = True - self._eoc: bool = eoc + break + + except trio.ClosedResourceError as cre: + self._closed = cre + drained.append(cre) break except ContextCancelled as ctxc: + # log.exception('GOT CTXC') log.cancel( 'Context was cancelled during stream closure:\n' f'canceller: {ctxc.canceller}\n' @@ -339,8 +346,11 @@ class MsgStream(trio.abc.Channel): # with trio.CancelScope(shield=True): # await rx_chan.aclose() - # self._eoc: bool = caught_eoc - + if not self._eoc: + self._eoc: bool = trio.EndOfChannel( + f'Context stream closed by {self._ctx.side}\n' + f'|_{self}\n' + ) # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # => NO, DEFINITELY NOT! <= # if we're a bi-dir ``MsgStream`` BECAUSE this same @@ -379,6 +389,26 @@ class MsgStream(trio.abc.Channel): # self._closed = True return drained + @property + def closed(self) -> bool: + if ( + (rxc := self._rx_chan._closed) + or + (_closed := self._closed) + or + (_eoc := self._eoc) + ): + log.runtime( + f'`MsgStream` is already closed\n' + f'{self}\n' + f' |_cid: {self._ctx.cid}\n' + f' |_rx_chan._closed: {type(rxc)} = {rxc}\n' + f' |_closed: {type(_closed)} = {_closed}\n' + f' |_eoc: {type(_eoc)} = {_eoc}' + ) + return True + return False + @acm async def subscribe( self,