diff --git a/tractor/_actor.py b/tractor/_actor.py index 4b215f8..f4711fb 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -46,6 +46,7 @@ class ActorFailure(Exception): async def _invoke( + actor: 'Actor', cid: str, chan: Channel, @@ -58,10 +59,12 @@ async def _invoke( """Invoke local func and deliver result(s) over provided channel. """ treat_as_gen = False - cs = None + cancel_scope = trio.CancelScope() - ctx = Context(chan, cid, _cancel_scope=cancel_scope) - context = False + cs: trio.CancelScope = None + + ctx = Context(chan, cid) + context: bool = False if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions @@ -149,14 +152,22 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - with cancel_scope as cs: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) if cs.cancelled_caught: + if ctx._cancel_called: + msg = f'{func.__name__} cancelled itself', + + else: + msg = f'{func.__name__} was remotely cancelled', + # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( - f'{func.__name__} cancelled itself', + msg, suberror_type=trio.Cancelled, ) @@ -191,8 +202,10 @@ async def _invoke( await chan.send(err_msg) except trio.ClosedResourceError: - log.warning( - f"Failed to ship error to caller @ {chan.uid}") + # if we can't propagate the error that's a big boo boo + log.error( + f"Failed to ship error to caller @ {chan.uid} !?" + ) if cs is None: # error is from above code not from rpc invocation @@ -372,12 +385,16 @@ class Actor: raise mne async def _stream_handler( + self, stream: trio.SocketStream, + ) -> None: """Entry point for new inbound connections to the channel server. + """ self._no_more_peers = trio.Event() # unset + chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -423,10 +440,24 @@ class Actor: try: await self._process_messages(chan) finally: + + # channel cleanup sequence + + # for (channel, cid) in self._rpc_tasks.copy(): + # if channel is chan: + # with trio.CancelScope(shield=True): + # await self._cancel_task(cid, channel) + + # # close all consumer side task mem chans + # send_chan, _ = self._cids2qs[(chan.uid, cid)] + # assert send_chan.cid == cid # type: ignore + # await send_chan.aclose() + # Drop ref to channel so it can be gc-ed and disconnected log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) + if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -439,14 +470,22 @@ class Actor: # # XXX: is this necessary (GC should do it?) if chan.connected(): + # if the channel is still connected it may mean the far + # end has not closed and we may have gotten here due to + # an error and so we should at least try to terminate + # the channel from this end gracefully. + log.debug(f"Disconnecting channel {chan}") try: - # send our msg loop terminate sentinel + # send a msg loop terminate sentinel await chan.send(None) + + # XXX: do we want this? + # causes "[104] connection reset by peer" on other end # await chan.aclose() + except trio.BrokenResourceError: - log.exception( - f"Channel for {chan.uid} was already zonked..") + log.warning(f"Channel for {chan.uid} was already closed") async def _push_result( self, @@ -456,18 +495,22 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - actorid = chan.uid - assert actorid, f"`actorid` can't be {actorid}" - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + # actorid = chan.uid + assert chan.uid, f"`chan.uid` can't be {chan.uid}" + send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] assert send_chan.cid == cid # type: ignore - # if 'stop' in msg: + if 'error' in msg: + ctx = getattr(recv_chan, '_ctx', None) + # if ctx: + # ctx._error_from_remote_msg(msg) + # log.debug(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) @@ -486,7 +529,9 @@ class Actor: self, actorid: Tuple[str, str], cid: str + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + log.debug(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] @@ -548,6 +593,7 @@ class Actor: log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") + break log.trace( # type: ignore diff --git a/tractor/_portal.py b/tractor/_portal.py index 3d1bd9c..ed5892d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,6 +177,7 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: + # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -369,64 +370,78 @@ class Portal: recv_chan: Optional[trio.MemoryReceiveChannel] = None + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) + + assert functype == 'context' + msg = await recv_chan.receive() + try: - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] - assert functype == 'context' - msg = await recv_chan.receive() + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") - try: - # the "first" value here is delivered by the callee's - # ``Context.started()`` call. - first = msg['started'] - - except KeyError: - assert msg.get('cid'), ("Received internal error at context?") - - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self.channel) - else: - raise - - # deliver context instance and .started() msg value in open - # tuple. - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - ) - - try: - yield ctx, first - - if cancel_on_exit: - await ctx.cancel() - - else: - if not ctx._cancel_called: - await ctx.result() - - except ContextCancelled: - # if the context was cancelled by client code - # then we don't need to raise since user code - # is expecting this. - if not ctx._cancel_called: - raise - - except BaseException: - # the context cancels itself on any deviation - await ctx.cancel() + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: raise - finally: - log.info(f'Context for {func.__name__} completed') + # deliver context instance and .started() msg value in open + # tuple. + try: + async with trio.open_nursery() as scope_nursery: + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + _scope_nursery=scope_nursery, + ) + recv_chan._ctx = ctx + + yield ctx, first + + log.info(f'Context for {func.__name__} completed') + + if cancel_on_exit: + await ctx.cancel() + + else: + if not ctx._cancel_called: + await ctx.result() + + await recv_chan.aclose() + + # except TypeError: + # # if fn_name == '_emsd_main': + # import tractor + # await tractor.breakpoint() + + except ContextCancelled: + if not ctx._cancel_called: + raise + + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this and the block should exit. + else: + log.debug(f'Context {ctx} cancelled gracefully') + + except trio.Cancelled: + # the context cancels itself on any deviation + await ctx.cancel() + raise + + # finally: + # log.info(f'Context for {func.__name__} completed') + + # finally: + # if recv_chan is not None: - finally: - if recv_chan is not None: - await recv_chan.aclose() @dataclass class LocalPortal: @@ -464,6 +479,7 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery, shield=shield) as nursery: + if not channel.connected(): await channel.connect() was_connected = True @@ -485,11 +501,12 @@ async def open_portal( portal = Portal(channel) try: yield portal + finally: await portal.aclose() if was_connected: - # cancel remote channel-msg loop + # gracefully signal remote channel-msg loop await channel.send(None) # cancel background msg loop task diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6b544d0..beb5eb2 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass from typing import ( Any, Iterator, Optional, Callable, - AsyncGenerator, + AsyncGenerator, Dict, ) import warnings @@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise trio.EndOfChannel try: + # if self._ctx.chan.uid[0] == 'brokerd.ib': + # breakpoint() + msg = await self._rx_chan.receive() return msg['yield'] @@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate + # except trio.Cancelled: + # if not self._shielded: + # # if shielded we don't propagate a cancelled + # raise + # except trio.Cancelled: # # relay cancels to the remote task # await self.aclose() @@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: + if rx_chan._closed: # or self._eoc: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # stop from the caller side await self._ctx.send_stop() - except trio.BrokenResourceError: + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): # the underlying channel may already have been pulled # in which case our stop message is meaningless since # it can't traverse the transport. @@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - # TODO: but make it broadcasting to consumers - # def clone(self): - # """Clone this receive channel allowing for multi-task - # consumption from the same channel. - - # """ - # return ReceiveStream( - # self._cid, - # self._rx_chan.clone(), - # self._portal, - # ) - class MsgStream(ReceiveMsgStream, trio.abc.Channel): """ @@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + # TODO: but make it broadcasting to consumers + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return MsgStream( + self._ctx, + self._rx_chan.clone(), + ) + @dataclass class Context: @@ -308,7 +318,7 @@ class Context: _cancel_called: bool = False # only set on the callee side - _cancel_scope: Optional[trio.CancelScope] = None + _scope_nursery: Optional[trio.Nursery] = None async def send_yield(self, data: Any) -> None: @@ -323,6 +333,16 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + def _error_from_remote_msg( + self, + msg: Dict[str, Any], + + ) -> None: + async def raiser(): + raise unpack_error(msg, self.chan) + + self._scope_nursery.start_soon(raiser) + async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -361,13 +381,16 @@ class Context: f"{cid} for {self._portal.channel.uid}") else: # ensure callee side - assert self._cancel_scope + assert self._scope_nursery # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? # This probably gets into the discussion in # https://github.com/goodboy/tractor/issues/36 - self._cancel_scope.cancel() + self._scope_nursery.cancel_scope.cancel() + + if self._recv_chan: + await self._recv_chan.aclose() @asynccontextmanager async def open_stream(