Consider relaying context error via raised-in-scope-nursery task
							parent
							
								
									288e2b5db1
								
							
						
					
					
						commit
						17dc6aaa2d
					
				|  | @ -46,6 +46,7 @@ class ActorFailure(Exception): | |||
| 
 | ||||
| 
 | ||||
| async def _invoke( | ||||
| 
 | ||||
|     actor: 'Actor', | ||||
|     cid: str, | ||||
|     chan: Channel, | ||||
|  | @ -58,10 +59,12 @@ async def _invoke( | |||
|     """Invoke local func and deliver result(s) over provided channel. | ||||
|     """ | ||||
|     treat_as_gen = False | ||||
|     cs = None | ||||
| 
 | ||||
|     cancel_scope = trio.CancelScope() | ||||
|     ctx = Context(chan, cid, _cancel_scope=cancel_scope) | ||||
|     context = False | ||||
|     cs: trio.CancelScope = None | ||||
| 
 | ||||
|     ctx = Context(chan, cid) | ||||
|     context: bool = False | ||||
| 
 | ||||
|     if getattr(func, '_tractor_stream_function', False): | ||||
|         # handle decorated ``@tractor.stream`` async functions | ||||
|  | @ -149,14 +152,22 @@ async def _invoke( | |||
|             # context func with support for bi-dir streaming | ||||
|             await chan.send({'functype': 'context', 'cid': cid}) | ||||
| 
 | ||||
|             with cancel_scope as cs: | ||||
|             async with trio.open_nursery() as scope_nursery: | ||||
|                 ctx._scope_nursery = scope_nursery | ||||
|                 cs = scope_nursery.cancel_scope | ||||
|                 task_status.started(cs) | ||||
|                 await chan.send({'return': await coro, 'cid': cid}) | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|                 if ctx._cancel_called: | ||||
|                     msg = f'{func.__name__} cancelled itself', | ||||
| 
 | ||||
|                 else: | ||||
|                     msg = f'{func.__name__} was remotely cancelled', | ||||
| 
 | ||||
|                 # task-contex was cancelled so relay to the cancel to caller | ||||
|                 raise ContextCancelled( | ||||
|                     f'{func.__name__} cancelled itself', | ||||
|                     msg, | ||||
|                     suberror_type=trio.Cancelled, | ||||
|                 ) | ||||
| 
 | ||||
|  | @ -191,8 +202,10 @@ async def _invoke( | |||
|             await chan.send(err_msg) | ||||
| 
 | ||||
|         except trio.ClosedResourceError: | ||||
|             log.warning( | ||||
|                 f"Failed to ship error to caller @ {chan.uid}") | ||||
|             # if we can't propagate the error that's a big boo boo | ||||
|             log.error( | ||||
|                 f"Failed to ship error to caller @ {chan.uid} !?" | ||||
|             ) | ||||
| 
 | ||||
|         if cs is None: | ||||
|             # error is from above code not from rpc invocation | ||||
|  | @ -372,12 +385,16 @@ class Actor: | |||
|             raise mne | ||||
| 
 | ||||
|     async def _stream_handler( | ||||
| 
 | ||||
|         self, | ||||
|         stream: trio.SocketStream, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         """Entry point for new inbound connections to the channel server. | ||||
| 
 | ||||
|         """ | ||||
|         self._no_more_peers = trio.Event()  # unset | ||||
| 
 | ||||
|         chan = Channel(stream=stream) | ||||
|         log.info(f"New connection to us {chan}") | ||||
| 
 | ||||
|  | @ -423,10 +440,24 @@ class Actor: | |||
|         try: | ||||
|             await self._process_messages(chan) | ||||
|         finally: | ||||
| 
 | ||||
|             # channel cleanup sequence | ||||
| 
 | ||||
|             # for (channel, cid) in self._rpc_tasks.copy(): | ||||
|             #     if channel is chan: | ||||
|             #         with trio.CancelScope(shield=True): | ||||
|             #             await self._cancel_task(cid, channel) | ||||
| 
 | ||||
|             #             # close all consumer side task mem chans | ||||
|             #             send_chan, _ = self._cids2qs[(chan.uid, cid)] | ||||
|             #             assert send_chan.cid == cid  # type: ignore | ||||
|             #             await send_chan.aclose() | ||||
| 
 | ||||
|             # Drop ref to channel so it can be gc-ed and disconnected | ||||
|             log.debug(f"Releasing channel {chan} from {chan.uid}") | ||||
|             chans = self._peers.get(chan.uid) | ||||
|             chans.remove(chan) | ||||
| 
 | ||||
|             if not chans: | ||||
|                 log.debug(f"No more channels for {chan.uid}") | ||||
|                 self._peers.pop(chan.uid, None) | ||||
|  | @ -439,14 +470,22 @@ class Actor: | |||
| 
 | ||||
|             # # XXX: is this necessary (GC should do it?) | ||||
|             if chan.connected(): | ||||
|                 # if the channel is still connected it may mean the far | ||||
|                 # end has not closed and we may have gotten here due to | ||||
|                 # an error and so we should at least try to terminate | ||||
|                 # the channel from this end gracefully. | ||||
| 
 | ||||
|                 log.debug(f"Disconnecting channel {chan}") | ||||
|                 try: | ||||
|                     # send our msg loop terminate sentinel | ||||
|                     # send a msg loop terminate sentinel | ||||
|                     await chan.send(None) | ||||
| 
 | ||||
|                     # XXX: do we want this? | ||||
|                     # causes "[104] connection reset by peer" on other end | ||||
|                     # await chan.aclose() | ||||
| 
 | ||||
|                 except trio.BrokenResourceError: | ||||
|                     log.exception( | ||||
|                         f"Channel for {chan.uid} was already zonked..") | ||||
|                     log.warning(f"Channel for {chan.uid} was already closed") | ||||
| 
 | ||||
|     async def _push_result( | ||||
|         self, | ||||
|  | @ -456,18 +495,22 @@ class Actor: | |||
|     ) -> None: | ||||
|         """Push an RPC result to the local consumer's queue. | ||||
|         """ | ||||
|         actorid = chan.uid | ||||
|         assert actorid, f"`actorid` can't be {actorid}" | ||||
|         send_chan, recv_chan = self._cids2qs[(actorid, cid)] | ||||
|         # actorid = chan.uid | ||||
|         assert chan.uid, f"`chan.uid` can't be {chan.uid}" | ||||
|         send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] | ||||
|         assert send_chan.cid == cid  # type: ignore | ||||
| 
 | ||||
|         # if 'stop' in msg: | ||||
|         if 'error' in msg: | ||||
|             ctx = getattr(recv_chan, '_ctx', None) | ||||
|             # if ctx: | ||||
|             #     ctx._error_from_remote_msg(msg) | ||||
| 
 | ||||
|         #     log.debug(f"{send_chan} was terminated at remote end") | ||||
|         #     # indicate to consumer that far end has stopped | ||||
|         #     return await send_chan.aclose() | ||||
| 
 | ||||
|         try: | ||||
|             log.debug(f"Delivering {msg} from {actorid} to caller {cid}") | ||||
|             log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") | ||||
|             # maintain backpressure | ||||
|             await send_chan.send(msg) | ||||
| 
 | ||||
|  | @ -486,7 +529,9 @@ class Actor: | |||
|         self, | ||||
|         actorid: Tuple[str, str], | ||||
|         cid: str | ||||
| 
 | ||||
|     ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: | ||||
| 
 | ||||
|         log.debug(f"Getting result queue for {actorid} cid {cid}") | ||||
|         try: | ||||
|             send_chan, recv_chan = self._cids2qs[(actorid, cid)] | ||||
|  | @ -548,9 +593,15 @@ class Actor: | |||
|                             if channel is chan: | ||||
|                                 await self._cancel_task(cid, channel) | ||||
| 
 | ||||
|                                 # close all consumer side task mem chans | ||||
|                                 # send_chan, _ = self._cids2qs[(chan.uid, cid)] | ||||
|                                 # assert send_chan.cid == cid  # type: ignore | ||||
|                                 # await send_chan.aclose() | ||||
| 
 | ||||
|                         log.debug( | ||||
|                                 f"Msg loop signalled to terminate for" | ||||
|                                 f" {chan} from {chan.uid}") | ||||
| 
 | ||||
|                         break | ||||
| 
 | ||||
|                     log.trace(   # type: ignore | ||||
|  |  | |||
|  | @ -177,6 +177,7 @@ class Portal: | |||
|                 f"Cancelling all streams with {self.channel.uid}") | ||||
|             for stream in self._streams.copy(): | ||||
|                 try: | ||||
|                     # with trio.CancelScope(shield=True): | ||||
|                     await stream.aclose() | ||||
|                 except trio.ClosedResourceError: | ||||
|                     # don't error the stream having already been closed | ||||
|  | @ -369,64 +370,78 @@ class Portal: | |||
| 
 | ||||
|         recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||
| 
 | ||||
|         cid, recv_chan, functype, first_msg = await self._submit( | ||||
|             fn_mod_path, fn_name, kwargs) | ||||
| 
 | ||||
|         assert functype == 'context' | ||||
|         msg = await recv_chan.receive() | ||||
| 
 | ||||
|         try: | ||||
|             cid, recv_chan, functype, first_msg = await self._submit( | ||||
|                 fn_mod_path, fn_name, kwargs) | ||||
|             # the "first" value here is delivered by the callee's | ||||
|             # ``Context.started()`` call. | ||||
|             first = msg['started'] | ||||
| 
 | ||||
|             assert functype == 'context' | ||||
|             msg = await recv_chan.receive() | ||||
|         except KeyError: | ||||
|             assert msg.get('cid'), ("Received internal error at context?") | ||||
| 
 | ||||
|             try: | ||||
|                 # the "first" value here is delivered by the callee's | ||||
|                 # ``Context.started()`` call. | ||||
|                 first = msg['started'] | ||||
| 
 | ||||
|             except KeyError: | ||||
|                 assert msg.get('cid'), ("Received internal error at context?") | ||||
| 
 | ||||
|                 if msg.get('error'): | ||||
|                     # raise the error message | ||||
|                     raise unpack_error(msg, self.channel) | ||||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|             # deliver context instance and .started() msg value in open | ||||
|             # tuple. | ||||
|             ctx = Context( | ||||
|                 self.channel, | ||||
|                 cid, | ||||
|                 _portal=self, | ||||
|                 _recv_chan=recv_chan, | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|                 if cancel_on_exit: | ||||
|                     await ctx.cancel() | ||||
| 
 | ||||
|                 else: | ||||
|                     if not ctx._cancel_called: | ||||
|                         await ctx.result() | ||||
| 
 | ||||
|             except ContextCancelled: | ||||
|                 # if the context was cancelled by client code | ||||
|                 # then we don't need to raise since user code | ||||
|                 # is expecting this. | ||||
|                 if not ctx._cancel_called: | ||||
|                     raise | ||||
| 
 | ||||
|             except BaseException: | ||||
|                 # the context cancels itself on any deviation | ||||
|                 await ctx.cancel() | ||||
|             if msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self.channel) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|             finally: | ||||
|                 log.info(f'Context for {func.__name__} completed') | ||||
|         # deliver context instance and .started() msg value in open | ||||
|         # tuple. | ||||
|         try: | ||||
|             async with trio.open_nursery() as scope_nursery: | ||||
|                 ctx = Context( | ||||
|                     self.channel, | ||||
|                     cid, | ||||
|                     _portal=self, | ||||
|                     _recv_chan=recv_chan, | ||||
|                     _scope_nursery=scope_nursery, | ||||
|                 ) | ||||
|                 recv_chan._ctx = ctx | ||||
| 
 | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|             log.info(f'Context for {func.__name__} completed') | ||||
| 
 | ||||
|             if cancel_on_exit: | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|             else: | ||||
|                 if not ctx._cancel_called: | ||||
|                     await ctx.result() | ||||
| 
 | ||||
|             await recv_chan.aclose() | ||||
| 
 | ||||
|             # except TypeError: | ||||
|             #     # if fn_name == '_emsd_main': | ||||
|             #     import tractor | ||||
|             #     await tractor.breakpoint() | ||||
| 
 | ||||
|         except ContextCancelled: | ||||
|             if not ctx._cancel_called: | ||||
|                 raise | ||||
| 
 | ||||
|             # if the context was cancelled by client code | ||||
|             # then we don't need to raise since user code | ||||
|             # is expecting this and the block should exit. | ||||
|             else: | ||||
|                 log.debug(f'Context {ctx} cancelled gracefully') | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # the context cancels itself on any deviation | ||||
|             await ctx.cancel() | ||||
|             raise | ||||
| 
 | ||||
|         # finally: | ||||
|         #     log.info(f'Context for {func.__name__} completed') | ||||
| 
 | ||||
|         # finally: | ||||
|         #     if recv_chan is not None: | ||||
| 
 | ||||
|         finally: | ||||
|             if recv_chan is not None: | ||||
|                 await recv_chan.aclose() | ||||
| 
 | ||||
| @dataclass | ||||
| class LocalPortal: | ||||
|  | @ -464,6 +479,7 @@ async def open_portal( | |||
|     was_connected = False | ||||
| 
 | ||||
|     async with maybe_open_nursery(nursery, shield=shield) as nursery: | ||||
| 
 | ||||
|         if not channel.connected(): | ||||
|             await channel.connect() | ||||
|             was_connected = True | ||||
|  | @ -485,11 +501,12 @@ async def open_portal( | |||
|         portal = Portal(channel) | ||||
|         try: | ||||
|             yield portal | ||||
| 
 | ||||
|         finally: | ||||
|             await portal.aclose() | ||||
| 
 | ||||
|             if was_connected: | ||||
|                 # cancel remote channel-msg loop | ||||
|                 # gracefully signal remote channel-msg loop | ||||
|                 await channel.send(None) | ||||
| 
 | ||||
|             # cancel background msg loop task | ||||
|  |  | |||
|  | @ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager | |||
| from dataclasses import dataclass | ||||
| from typing import ( | ||||
|     Any, Iterator, Optional, Callable, | ||||
|     AsyncGenerator, | ||||
|     AsyncGenerator, Dict, | ||||
| ) | ||||
| 
 | ||||
| import warnings | ||||
|  | @ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|         try: | ||||
|             # if self._ctx.chan.uid[0] == 'brokerd.ib': | ||||
|             #     breakpoint() | ||||
| 
 | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|  | @ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
| 
 | ||||
|             raise  # propagate | ||||
| 
 | ||||
|         # except trio.Cancelled: | ||||
|         #     if not self._shielded: | ||||
|         #         # if shielded we don't propagate a cancelled | ||||
|         #         raise | ||||
| 
 | ||||
|         # except trio.Cancelled: | ||||
|         #     # relay cancels to the remote task | ||||
|         #     await self.aclose() | ||||
|  | @ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|         if rx_chan._closed:  # or self._eoc: | ||||
|             log.warning(f"{self} is already closed") | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|  | @ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|                 # stop from the caller side | ||||
|                 await self._ctx.send_stop() | ||||
| 
 | ||||
|             except trio.BrokenResourceError: | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|                 trio.ClosedResourceError | ||||
|             ): | ||||
|                 # the underlying channel may already have been pulled | ||||
|                 # in which case our stop message is meaningless since | ||||
|                 # it can't traverse the transport. | ||||
|  | @ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         # still need to consume msgs that are "in transit" from the far | ||||
|         # end (eg. for ``Context.result()``). | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     # def clone(self): | ||||
|     #     """Clone this receive channel allowing for multi-task | ||||
|     #     consumption from the same channel. | ||||
| 
 | ||||
|     #     """ | ||||
|     #     return ReceiveStream( | ||||
|     #         self._cid, | ||||
|     #         self._rx_chan.clone(), | ||||
|     #         self._portal, | ||||
|     #     ) | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
|     """ | ||||
|  | @ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): | |||
|         ''' | ||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     def clone(self): | ||||
|         """Clone this receive channel allowing for multi-task | ||||
|         consumption from the same channel. | ||||
| 
 | ||||
|         """ | ||||
|         return MsgStream( | ||||
|             self._ctx, | ||||
|             self._rx_chan.clone(), | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class Context: | ||||
|  | @ -308,7 +318,7 @@ class Context: | |||
|     _cancel_called: bool = False | ||||
| 
 | ||||
|     # only set on the callee side | ||||
|     _cancel_scope: Optional[trio.CancelScope] = None | ||||
|     _scope_nursery: Optional[trio.Nursery] = None | ||||
| 
 | ||||
|     async def send_yield(self, data: Any) -> None: | ||||
| 
 | ||||
|  | @ -323,6 +333,16 @@ class Context: | |||
|     async def send_stop(self) -> None: | ||||
|         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||
| 
 | ||||
|     def _error_from_remote_msg( | ||||
|         self, | ||||
|         msg: Dict[str, Any], | ||||
| 
 | ||||
|     ) -> None: | ||||
|         async def raiser(): | ||||
|             raise unpack_error(msg, self.chan) | ||||
| 
 | ||||
|         self._scope_nursery.start_soon(raiser) | ||||
| 
 | ||||
|     async def cancel(self) -> None: | ||||
|         '''Cancel this inter-actor-task context. | ||||
| 
 | ||||
|  | @ -361,13 +381,16 @@ class Context: | |||
|                         f"{cid} for {self._portal.channel.uid}") | ||||
|         else: | ||||
|             # ensure callee side | ||||
|             assert self._cancel_scope | ||||
|             assert self._scope_nursery | ||||
|             # TODO: should we have an explicit cancel message | ||||
|             # or is relaying the local `trio.Cancelled` as an | ||||
|             # {'error': trio.Cancelled, cid: "blah"} enough? | ||||
|             # This probably gets into the discussion in | ||||
|             # https://github.com/goodboy/tractor/issues/36 | ||||
|             self._cancel_scope.cancel() | ||||
|             self._scope_nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|         if self._recv_chan: | ||||
|             await self._recv_chan.aclose() | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     async def open_stream( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue