diff --git a/tractor/_portal.py b/tractor/_portal.py index 3d1bd9c..ed5892d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,6 +177,7 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: + # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -369,64 +370,78 @@ class Portal: recv_chan: Optional[trio.MemoryReceiveChannel] = None + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) + + assert functype == 'context' + msg = await recv_chan.receive() + try: - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] - assert functype == 'context' - msg = await recv_chan.receive() + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") - try: - # the "first" value here is delivered by the callee's - # ``Context.started()`` call. - first = msg['started'] - - except KeyError: - assert msg.get('cid'), ("Received internal error at context?") - - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self.channel) - else: - raise - - # deliver context instance and .started() msg value in open - # tuple. - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - ) - - try: - yield ctx, first - - if cancel_on_exit: - await ctx.cancel() - - else: - if not ctx._cancel_called: - await ctx.result() - - except ContextCancelled: - # if the context was cancelled by client code - # then we don't need to raise since user code - # is expecting this. - if not ctx._cancel_called: - raise - - except BaseException: - # the context cancels itself on any deviation - await ctx.cancel() + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: raise - finally: - log.info(f'Context for {func.__name__} completed') + # deliver context instance and .started() msg value in open + # tuple. + try: + async with trio.open_nursery() as scope_nursery: + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + _scope_nursery=scope_nursery, + ) + recv_chan._ctx = ctx + + yield ctx, first + + log.info(f'Context for {func.__name__} completed') + + if cancel_on_exit: + await ctx.cancel() + + else: + if not ctx._cancel_called: + await ctx.result() + + await recv_chan.aclose() + + # except TypeError: + # # if fn_name == '_emsd_main': + # import tractor + # await tractor.breakpoint() + + except ContextCancelled: + if not ctx._cancel_called: + raise + + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this and the block should exit. + else: + log.debug(f'Context {ctx} cancelled gracefully') + + except trio.Cancelled: + # the context cancels itself on any deviation + await ctx.cancel() + raise + + # finally: + # log.info(f'Context for {func.__name__} completed') + + # finally: + # if recv_chan is not None: - finally: - if recv_chan is not None: - await recv_chan.aclose() @dataclass class LocalPortal: @@ -464,6 +479,7 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery, shield=shield) as nursery: + if not channel.connected(): await channel.connect() was_connected = True @@ -485,11 +501,12 @@ async def open_portal( portal = Portal(channel) try: yield portal + finally: await portal.aclose() if was_connected: - # cancel remote channel-msg loop + # gracefully signal remote channel-msg loop await channel.send(None) # cancel background msg loop task diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6b544d0..beb5eb2 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass from typing import ( Any, Iterator, Optional, Callable, - AsyncGenerator, + AsyncGenerator, Dict, ) import warnings @@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise trio.EndOfChannel try: + # if self._ctx.chan.uid[0] == 'brokerd.ib': + # breakpoint() + msg = await self._rx_chan.receive() return msg['yield'] @@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate + # except trio.Cancelled: + # if not self._shielded: + # # if shielded we don't propagate a cancelled + # raise + # except trio.Cancelled: # # relay cancels to the remote task # await self.aclose() @@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: + if rx_chan._closed: # or self._eoc: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # stop from the caller side await self._ctx.send_stop() - except trio.BrokenResourceError: + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): # the underlying channel may already have been pulled # in which case our stop message is meaningless since # it can't traverse the transport. @@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - # TODO: but make it broadcasting to consumers - # def clone(self): - # """Clone this receive channel allowing for multi-task - # consumption from the same channel. - - # """ - # return ReceiveStream( - # self._cid, - # self._rx_chan.clone(), - # self._portal, - # ) - class MsgStream(ReceiveMsgStream, trio.abc.Channel): """ @@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + # TODO: but make it broadcasting to consumers + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return MsgStream( + self._ctx, + self._rx_chan.clone(), + ) + @dataclass class Context: @@ -308,7 +318,7 @@ class Context: _cancel_called: bool = False # only set on the callee side - _cancel_scope: Optional[trio.CancelScope] = None + _scope_nursery: Optional[trio.Nursery] = None async def send_yield(self, data: Any) -> None: @@ -323,6 +333,16 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + def _error_from_remote_msg( + self, + msg: Dict[str, Any], + + ) -> None: + async def raiser(): + raise unpack_error(msg, self.chan) + + self._scope_nursery.start_soon(raiser) + async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -361,13 +381,16 @@ class Context: f"{cid} for {self._portal.channel.uid}") else: # ensure callee side - assert self._cancel_scope + assert self._scope_nursery # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? # This probably gets into the discussion in # https://github.com/goodboy/tractor/issues/36 - self._cancel_scope.cancel() + self._scope_nursery.cancel_scope.cancel() + + if self._recv_chan: + await self._recv_chan.aclose() @asynccontextmanager async def open_stream(