forked from goodboy/tractor
				
			Much more limited `asyncio.Task.cancel()` use
Since it can not only cause the guest-mode run to abandon but also in some edge cases prevent `trio`-errors from propagating (at least on py3.12-13?) as discovered as part of supporting this mode officially in the *root actor*. As such try to avoid that method as much as possible instead opting to pass the `trio`-side error via the iter-task channel ref. Deats, - add a `LinkedTaskChannel._trio_err: BaseException|None` which gets set whenver the `trio.Task` error is caught; ONLY set `AsyncioCancelled` when the `trio` task was for sure the cause, whether itself cancelled or errored. - always check for this error when exiting the `asyncio` side (even when terminated via a call to `asyncio.Task.cancel()` or during any other `CancelledError` handling such that the `asyncio`-task can expect to handle `AsyncioCancelled` due to the above^^ cases. - never `cs.cancel()` the `trio` side unless that cancel scope has not yet been `.cancel_called` whatsoever; it's a noop anyway. - only raise any exc from `asyncio.Task.result()` when `chan._aio_err` does not already match it since the existence of the pre-existing `task_err` means `asyncio` prolly intends (or has already) raised and interrupted the task elsewhere. Various supporting tweaks, - don't bother maybe-init-ing `greenback` from the actor entrypoint since we already need to (and do) bestow the portals to each `asyncio` task spawned using the `run_task()`/`open_channel_from()` API; further the init-ing should be done already by client code that enables infected mode (even in the root actor). |_we should prolly also codify it from any `run_daemon(infected_aio=True, debug_mode=True)` usage we offer. - pass all the `_<field>`s to `Linked TaskChannel` explicitly in named kwarg style. - better sclang-style log reports throughout, particularly on teardowns. - generally more/better comments and docs around (not well understood) edge cases. - prep to just inline `maybe_raise_aio_side_err()` closure..remotes/1757153874605917753/main
							parent
							
								
									8f0ca44b79
								
							
						
					
					
						commit
						c5291b7f33
					
				|  | @ -33,13 +33,19 @@ from typing import ( | |||
| ) | ||||
| 
 | ||||
| import tractor | ||||
| from tractor._exceptions import AsyncioCancelled | ||||
| from tractor._exceptions import ( | ||||
|     AsyncioCancelled, | ||||
|     is_multi_cancelled, | ||||
| ) | ||||
| from tractor._state import ( | ||||
|     debug_mode, | ||||
|     _runtime_vars, | ||||
| ) | ||||
| from tractor.devx import _debug | ||||
| from tractor.log import get_logger | ||||
| from tractor.log import ( | ||||
|     get_logger, | ||||
|     StackLevelAdapter, | ||||
| ) | ||||
| from tractor.trionics._broadcast import ( | ||||
|     broadcast_receiver, | ||||
|     BroadcastReceiver, | ||||
|  | @ -50,7 +56,7 @@ from outcome import ( | |||
|     Outcome, | ||||
| ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| log: StackLevelAdapter = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| __all__ = [ | ||||
|  | @ -70,9 +76,10 @@ class LinkedTaskChannel(trio.abc.Channel): | |||
|     _to_aio: asyncio.Queue | ||||
|     _from_aio: trio.MemoryReceiveChannel | ||||
|     _to_trio: trio.MemorySendChannel | ||||
| 
 | ||||
|     _trio_cs: trio.CancelScope | ||||
|     _aio_task_complete: trio.Event | ||||
| 
 | ||||
|     _trio_err: BaseException|None = None | ||||
|     _trio_exited: bool = False | ||||
| 
 | ||||
|     # set after ``asyncio.create_task()`` | ||||
|  | @ -84,28 +91,40 @@ class LinkedTaskChannel(trio.abc.Channel): | |||
|         await self._from_aio.aclose() | ||||
| 
 | ||||
|     async def receive(self) -> Any: | ||||
|         async with translate_aio_errors( | ||||
|             self, | ||||
| 
 | ||||
|             # XXX: obviously this will deadlock if an on-going stream is | ||||
|             # being procesed. | ||||
|             # wait_on_aio_task=False, | ||||
|         ): | ||||
|         ''' | ||||
|         Receive a value from the paired `asyncio.Task` with | ||||
|         exception/cancel handling to teardown both sides on any | ||||
|         unexpected error. | ||||
| 
 | ||||
|         ''' | ||||
|         try: | ||||
|             # TODO: do we need this to guarantee asyncio code get's | ||||
|             # cancelled in the case where the trio side somehow creates | ||||
|             # a state where the asyncio cycle-task isn't getting the | ||||
|             # cancel request sent by (in theory) the last checkpoint | ||||
|             # cycle on the trio side? | ||||
|             # await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
|             return await self._from_aio.receive() | ||||
|         except BaseException as err: | ||||
|             async with translate_aio_errors( | ||||
|                 self, | ||||
| 
 | ||||
|                 # XXX: obviously this will deadlock if an on-going stream is | ||||
|                 # being procesed. | ||||
|                 # wait_on_aio_task=False, | ||||
|             ): | ||||
|                 raise err | ||||
| 
 | ||||
|     async def wait_asyncio_complete(self) -> None: | ||||
|         await self._aio_task_complete.wait() | ||||
| 
 | ||||
|     # def cancel_asyncio_task(self) -> None: | ||||
|     #     self._aio_task.cancel() | ||||
|     def cancel_asyncio_task( | ||||
|         self, | ||||
|         msg: str = '', | ||||
|     ) -> None: | ||||
|         self._aio_task.cancel( | ||||
|             msg=msg, | ||||
|         ) | ||||
| 
 | ||||
|     async def send(self, item: Any) -> None: | ||||
|         ''' | ||||
|  | @ -155,7 +174,6 @@ class LinkedTaskChannel(trio.abc.Channel): | |||
| 
 | ||||
| 
 | ||||
| def _run_asyncio_task( | ||||
| 
 | ||||
|     func: Callable, | ||||
|     *, | ||||
|     qsize: int = 1, | ||||
|  | @ -165,8 +183,9 @@ def _run_asyncio_task( | |||
| 
 | ||||
| ) -> LinkedTaskChannel: | ||||
|     ''' | ||||
|     Run an ``asyncio`` async function or generator in a task, return | ||||
|     or stream the result back to the caller `trio.lowleve.Task`. | ||||
|     Run an `asyncio`-compat async function or generator in a task, | ||||
|     return or stream the result back to the caller | ||||
|     `trio.lowleve.Task`. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
|  | @ -204,23 +223,23 @@ def _run_asyncio_task( | |||
|     aio_err: BaseException|None = None | ||||
| 
 | ||||
|     chan = LinkedTaskChannel( | ||||
|         aio_q,  # asyncio.Queue | ||||
|         from_aio,  # recv chan | ||||
|         to_trio,  # send chan | ||||
| 
 | ||||
|         cancel_scope, | ||||
|         aio_task_complete, | ||||
|         _to_aio=aio_q,  # asyncio.Queue | ||||
|         _from_aio=from_aio,  # recv chan | ||||
|         _to_trio=to_trio,  # send chan | ||||
|         _trio_cs=cancel_scope, | ||||
|         _aio_task_complete=aio_task_complete, | ||||
|     ) | ||||
| 
 | ||||
|     async def wait_on_coro_final_result( | ||||
| 
 | ||||
|         to_trio: trio.MemorySendChannel, | ||||
|         coro: Awaitable, | ||||
|         aio_task_complete: trio.Event, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Await ``coro`` and relay result back to ``trio``. | ||||
|         Await `coro` and relay result back to `trio`. | ||||
| 
 | ||||
|         This can only be run as an `asyncio.Task`! | ||||
| 
 | ||||
|         ''' | ||||
|         nonlocal aio_err | ||||
|  | @ -243,8 +262,10 @@ def _run_asyncio_task( | |||
| 
 | ||||
|         else: | ||||
|             if ( | ||||
|                 result != orig and | ||||
|                 aio_err is None and | ||||
|                 result != orig | ||||
|                 and | ||||
|                 aio_err is None | ||||
|                 and | ||||
| 
 | ||||
|                 # in the `open_channel_from()` case we don't | ||||
|                 # relay through the "return value". | ||||
|  | @ -260,12 +281,21 @@ def _run_asyncio_task( | |||
|                 # a ``trio.EndOfChannel`` to the trio (consumer) side. | ||||
|                 to_trio.close() | ||||
| 
 | ||||
|             # import pdbp; pdbp.set_trace() | ||||
|             aio_task_complete.set() | ||||
|             log.runtime(f'`asyncio` task: {task.get_name()} is complete') | ||||
|             # await asyncio.sleep(0.1) | ||||
|             log.info( | ||||
|                 f'`asyncio` task terminated\n' | ||||
|                 f'x)>\n' | ||||
|                 f'  |_{task}\n' | ||||
|             ) | ||||
| 
 | ||||
|     # start the asyncio task we submitted from trio | ||||
|     if not inspect.isawaitable(coro): | ||||
|         raise TypeError(f"No support for invoking {coro}") | ||||
|         raise TypeError( | ||||
|             f'Pass the async-fn NOT a coroutine\n' | ||||
|             f'{coro!r}' | ||||
|         ) | ||||
| 
 | ||||
|     task: asyncio.Task = asyncio.create_task( | ||||
|         wait_on_coro_final_result( | ||||
|  | @ -289,6 +319,10 @@ def _run_asyncio_task( | |||
|             raise_not_found=False, | ||||
|         )) | ||||
|     ): | ||||
|         log.info( | ||||
|             f'Bestowing `greenback` portal for `asyncio`-task\n' | ||||
|             f'{task}\n' | ||||
|         ) | ||||
|         greenback.bestow_portal(task) | ||||
| 
 | ||||
|     def cancel_trio(task: asyncio.Task) -> None: | ||||
|  | @ -304,11 +338,22 @@ def _run_asyncio_task( | |||
|         # task exceptions | ||||
|         try: | ||||
|             res: Any = task.result() | ||||
|             log.info( | ||||
|                 '`trio` received final result from {task}\n' | ||||
|                 f'|_{res}\n' | ||||
|             ) | ||||
|         except BaseException as terr: | ||||
|             task_err: BaseException = terr | ||||
| 
 | ||||
|             # read again AFTER the `asyncio` side errors in case | ||||
|             # it was cancelled due to an error from `trio` (or | ||||
|             # some other out of band exc). | ||||
|             aio_err: BaseException|None = chan._aio_err | ||||
| 
 | ||||
|             msg: str = ( | ||||
|                 'Infected `asyncio` task {etype_str}\n' | ||||
|                 '`trio`-side reports that the `asyncio`-side ' | ||||
|                 '{etype_str}\n' | ||||
|                 # ^NOTE filled in below | ||||
|             ) | ||||
|             if isinstance(terr, CancelledError): | ||||
|                 msg += ( | ||||
|  | @ -327,17 +372,18 @@ def _run_asyncio_task( | |||
|                     msg.format(etype_str='errored') | ||||
|                 ) | ||||
| 
 | ||||
|             assert type(terr) is type(aio_err), ( | ||||
|                 '`asyncio` task error mismatch?!?' | ||||
|             ) | ||||
|             assert ( | ||||
|                 type(terr) is type(aio_err) | ||||
|             ), '`asyncio` task error mismatch?!?' | ||||
| 
 | ||||
|         if aio_err is not None: | ||||
|             # import pdbp; pdbp.set_trace() | ||||
|             # XXX: uhh is this true? | ||||
|             # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?' | ||||
| 
 | ||||
|             # NOTE: currently mem chan closure may act as a form | ||||
|             # of error relay (at least in the ``asyncio.CancelledError`` | ||||
|             # case) since we have no way to directly trigger a ``trio`` | ||||
|             # of error relay (at least in the `asyncio.CancelledError` | ||||
|             # case) since we have no way to directly trigger a `trio` | ||||
|             # task error without creating a nursery to throw one. | ||||
|             # We might want to change this in the future though. | ||||
|             from_aio.close() | ||||
|  | @ -359,29 +405,25 @@ def _run_asyncio_task( | |||
|             #     ) | ||||
|             #     raise aio_err from task_err | ||||
| 
 | ||||
|             # XXX: if not already, alway cancel the scope | ||||
|             # on a task error in case the trio task is blocking on | ||||
|             # XXX: if not already, alway cancel the scope on a task | ||||
|             # error in case the trio task is blocking on | ||||
|             # a checkpoint. | ||||
|             cancel_scope.cancel() | ||||
| 
 | ||||
|             if ( | ||||
|                 task_err | ||||
|                 and | ||||
|                 aio_err is not task_err | ||||
|                 not cancel_scope.cancelled_caught | ||||
|                 or | ||||
|                 not cancel_scope.cancel_called | ||||
|             ): | ||||
|                 raise aio_err from task_err | ||||
|                 # import pdbp; pdbp.set_trace() | ||||
|                 cancel_scope.cancel() | ||||
| 
 | ||||
|             # raise any `asyncio` side error. | ||||
|             raise aio_err | ||||
| 
 | ||||
|         log.info( | ||||
|             '`trio` received final result from {task}\n' | ||||
|             f'|_{res}\n' | ||||
|         ) | ||||
|         # TODO: do we need this? | ||||
|         # if task_err: | ||||
|         #     cancel_scope.cancel() | ||||
|         #     raise task_err | ||||
|             if task_err: | ||||
|                 # XXX raise any `asyncio` side error IFF it doesn't | ||||
|                 # match the one we just caught from the task above! | ||||
|                 # (that would indicate something weird/very-wrong | ||||
|                 # going on?) | ||||
|                 if aio_err is not task_err: | ||||
|                     # import pdbp; pdbp.set_trace() | ||||
|                     raise aio_err from task_err | ||||
| 
 | ||||
|     task.add_done_callback(cancel_trio) | ||||
|     return chan | ||||
|  | @ -389,13 +431,18 @@ def _run_asyncio_task( | |||
| 
 | ||||
| @acm | ||||
| async def translate_aio_errors( | ||||
| 
 | ||||
|     chan: LinkedTaskChannel, | ||||
|     wait_on_aio_task: bool = False, | ||||
|     cancel_aio_task_on_trio_exit: bool = True, | ||||
| 
 | ||||
| ) -> AsyncIterator[None]: | ||||
|     ''' | ||||
|     Error handling context around ``asyncio`` task spawns which | ||||
|     An error handling to cross-loop propagation context around | ||||
|     `asyncio.Task` spawns via one of this module's APIs: | ||||
| 
 | ||||
|     - `open_channel_from()` | ||||
|     - `run_task()` | ||||
| 
 | ||||
|     appropriately translates errors and cancels into ``trio`` land. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -403,88 +450,204 @@ async def translate_aio_errors( | |||
| 
 | ||||
|     aio_err: BaseException|None = None | ||||
| 
 | ||||
|     # TODO: make thisi a channel method? | ||||
|     def maybe_raise_aio_err( | ||||
|         err: Exception|None = None | ||||
|     ) -> None: | ||||
|         aio_err = chan._aio_err | ||||
|         if ( | ||||
|             aio_err is not None | ||||
|             and | ||||
|             # not isinstance(aio_err, CancelledError) | ||||
|             type(aio_err) != CancelledError | ||||
|         ): | ||||
|             # always raise from any captured asyncio error | ||||
|             if err: | ||||
|                 raise aio_err from err | ||||
|             else: | ||||
|                 raise aio_err | ||||
| 
 | ||||
|     task = chan._aio_task | ||||
|     assert task | ||||
|     aio_task: asyncio.Task = chan._aio_task | ||||
|     assert aio_task | ||||
|     trio_err: BaseException|None = None | ||||
|     try: | ||||
|         yield | ||||
| 
 | ||||
|         yield  # back to one of the cross-loop apis | ||||
|     except ( | ||||
|         trio.Cancelled, | ||||
|     ): | ||||
|         # relay cancel through to called ``asyncio`` task | ||||
|     ) as _trio_err: | ||||
|         trio_err = _trio_err | ||||
|         assert chan._aio_task | ||||
|         chan._aio_task.cancel( | ||||
|             msg=f'the `trio` caller task was cancelled: {trio_task.name}' | ||||
| 
 | ||||
|         # import pdbp; pdbp.set_trace()  # lolevel-debug | ||||
| 
 | ||||
|         # relay cancel through to called ``asyncio`` task | ||||
|         chan._aio_err = AsyncioCancelled( | ||||
|             f'trio`-side cancelled the `asyncio`-side,\n' | ||||
|             f'c)>\n' | ||||
|             f'  |_{trio_task}\n\n' | ||||
| 
 | ||||
| 
 | ||||
|             f'{trio_err!r}\n' | ||||
|         ) | ||||
|         raise | ||||
| 
 | ||||
|         # XXX NOTE XXX seems like we can get all sorts of unreliable | ||||
|         # behaviour from `asyncio` under various cancellation | ||||
|         # conditions (like SIGINT/kbi) when this is used.. | ||||
|         # SO FOR NOW, try to avoid it at most costs! | ||||
|         # | ||||
|         # aio_task.cancel( | ||||
|         #     msg=f'the `trio` parent task was cancelled: {trio_task.name}' | ||||
|         # ) | ||||
|         # raise | ||||
| 
 | ||||
|     except ( | ||||
|         # NOTE: see the note in the ``cancel_trio()`` asyncio task | ||||
|         # NOTE: also see note in the `cancel_trio()` asyncio task | ||||
|         # termination callback | ||||
|         trio.ClosedResourceError, | ||||
|         # trio.BrokenResourceError, | ||||
|     ): | ||||
| 
 | ||||
|     ) as _trio_err: | ||||
|         trio_err = _trio_err | ||||
|         aio_err = chan._aio_err | ||||
|         # import pdbp; pdbp.set_trace() | ||||
| 
 | ||||
|         # XXX if an underlying `asyncio.CancelledError` triggered | ||||
|         # this channel close, raise our (non-`BaseException`) wrapper | ||||
|         # exception (`AsyncioCancelled`) from that source error. | ||||
|         if ( | ||||
|             task.cancelled() | ||||
|             # NOTE, not until it terminates? | ||||
|             aio_task.cancelled() | ||||
|             and | ||||
|             type(aio_err) is CancelledError | ||||
|         ): | ||||
|             # if an underlying `asyncio.CancelledError` triggered this | ||||
|             # channel close, raise our (non-``BaseException``) wrapper | ||||
|             # error: ``AsyncioCancelled`` from that source error. | ||||
|             raise AsyncioCancelled( | ||||
|                 f'Task cancelled\n' | ||||
|                 f'|_{task}\n' | ||||
|                 f'asyncio`-side cancelled the `trio`-side,\n' | ||||
|                 f'c(>\n' | ||||
|                 f'  |_{aio_task}\n\n' | ||||
| 
 | ||||
|                 f'{trio_err!r}\n' | ||||
|             ) from aio_err | ||||
| 
 | ||||
|         else: | ||||
|             raise | ||||
| 
 | ||||
|     finally: | ||||
|     except BaseException as _trio_err: | ||||
|         trio_err = _trio_err | ||||
|         log.exception( | ||||
|             '`trio`-side task errored?' | ||||
|         ) | ||||
| 
 | ||||
|         entered: bool = await _debug._maybe_enter_pm( | ||||
|             trio_err, | ||||
|             api_frame=inspect.currentframe(), | ||||
|         ) | ||||
|         if ( | ||||
|             # NOTE: always cancel the ``asyncio`` task if we've made it | ||||
|             # this far and it's not done. | ||||
|             not task.done() and aio_err | ||||
|             not entered | ||||
|             and | ||||
|             not is_multi_cancelled(trio_err) | ||||
|         ): | ||||
|             log.exception('actor crashed\n') | ||||
| 
 | ||||
|         aio_taskc = AsyncioCancelled( | ||||
|             f'`trio`-side task errored!\n' | ||||
|             f'{trio_err}' | ||||
|         ) #from trio_err | ||||
| 
 | ||||
|         try: | ||||
|             aio_task.set_exception(aio_taskc) | ||||
|         except ( | ||||
|             asyncio.InvalidStateError, | ||||
|             RuntimeError, | ||||
|             # ^XXX, uhh bc apparently we can't use `.set_exception()` | ||||
|             # any more XD .. ?? | ||||
|         ): | ||||
|             wait_on_aio_task = False | ||||
| 
 | ||||
|         # import pdbp; pdbp.set_trace() | ||||
|         # raise aio_taskc from trio_err | ||||
| 
 | ||||
|     finally: | ||||
|         # record wtv `trio`-side error transpired | ||||
|         chan._trio_err = trio_err | ||||
| 
 | ||||
|         # NOTE! by default always cancel the `asyncio` task if | ||||
|         # we've made it this far and it's not done. | ||||
|         # TODO, how to detect if there's an out-of-band error that | ||||
|         # caused the exit? | ||||
|         if ( | ||||
|             cancel_aio_task_on_trio_exit | ||||
|             and | ||||
|             not aio_task.done() | ||||
|             and | ||||
|             aio_err | ||||
| 
 | ||||
|             # or the trio side has exited it's surrounding cancel scope | ||||
|             # indicating the lifetime of the ``asyncio``-side task | ||||
|             # should also be terminated. | ||||
|             or chan._trio_exited | ||||
|         ): | ||||
|             log.runtime( | ||||
|                 f'Cancelling `asyncio`-task: {task.get_name()}' | ||||
|             or ( | ||||
|                 chan._trio_exited | ||||
|                 and | ||||
|                 not chan._trio_err   # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. | ||||
|             ) | ||||
|             # assert not aio_err, 'WTF how did asyncio do this?!' | ||||
|             task.cancel() | ||||
|         ): | ||||
|             # pass | ||||
|             msg: str = ( | ||||
|                 f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' | ||||
|                 f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' | ||||
| 
 | ||||
|         # Required to sync with the far end ``asyncio``-task to ensure | ||||
|                 f'trio-side exited silently!' | ||||
|             ) | ||||
|             # TODO XXX, figure out the case where calling this makes the | ||||
|             # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` | ||||
|             # hang and then don't call it in that case! | ||||
|             # | ||||
|             aio_task.cancel(msg=msg) | ||||
|             log.warning(msg) | ||||
|             # assert not aio_err, 'WTF how did asyncio do this?!' | ||||
|             # import pdbp; pdbp.set_trace() | ||||
| 
 | ||||
|         # Required to sync with the far end `asyncio`-task to ensure | ||||
|         # any error is captured (via monkeypatching the | ||||
|         # ``channel._aio_err``) before calling ``maybe_raise_aio_err()`` | ||||
|         # `channel._aio_err`) before calling ``maybe_raise_aio_err()`` | ||||
|         # below! | ||||
|         # | ||||
|         # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above | ||||
|         # MUST NOT EXCEPT or this WILL HANG!! | ||||
|         # | ||||
|         # so if you get a hang maybe step through and figure out why | ||||
|         # it erroed out up there! | ||||
|         # | ||||
|         if wait_on_aio_task: | ||||
|             # await chan.wait_asyncio_complete() | ||||
|             await chan._aio_task_complete.wait() | ||||
|             log.info( | ||||
|                 'asyncio-task is done and unblocked trio-side!\n' | ||||
|             ) | ||||
| 
 | ||||
|         # TODO? | ||||
|         # -[ ] make this a channel method, OR | ||||
|         # -[ ] just put back inline below? | ||||
|         # | ||||
|         def maybe_raise_aio_side_err( | ||||
|             trio_err: Exception, | ||||
|         ) -> None: | ||||
|             ''' | ||||
|             Raise any `trio`-side-caused cancellation or legit task | ||||
|             error normally propagated from the caller of either, | ||||
|               - `open_channel_from()` | ||||
|               - `run_task()` | ||||
| 
 | ||||
|             ''' | ||||
|             aio_err: BaseException|None = chan._aio_err | ||||
| 
 | ||||
|             # Check if the asyncio-side is the cause of the trio-side | ||||
|             # error. | ||||
|             if ( | ||||
|                 aio_err is not None | ||||
|                 and | ||||
|                 type(aio_err) is not AsyncioCancelled | ||||
| 
 | ||||
|                 # not isinstance(aio_err, CancelledError) | ||||
|                 # type(aio_err) is not CancelledError | ||||
|             ): | ||||
|                 # always raise from any captured asyncio error | ||||
|                 if trio_err: | ||||
|                     raise trio_err from aio_err | ||||
| 
 | ||||
|                 raise aio_err | ||||
| 
 | ||||
|             if trio_err: | ||||
|                 raise trio_err | ||||
| 
 | ||||
|         # NOTE: if any ``asyncio`` error was caught, raise it here inline | ||||
|         # here in the ``trio`` task | ||||
|         maybe_raise_aio_err() | ||||
|         # if trio_err: | ||||
|         maybe_raise_aio_side_err( | ||||
|             trio_err=trio_err | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| async def run_task( | ||||
|  | @ -496,8 +659,8 @@ async def run_task( | |||
| 
 | ||||
| ) -> Any: | ||||
|     ''' | ||||
|     Run an `asyncio` async function or generator in a task, return | ||||
|     or stream the result back to `trio`. | ||||
|     Run an `asyncio`-compat async function or generator in a task, | ||||
|     return or stream the result back to `trio`. | ||||
| 
 | ||||
|     ''' | ||||
|     # simple async func | ||||
|  | @ -537,6 +700,7 @@ async def open_channel_from( | |||
|         provide_channels=True, | ||||
|         **kwargs, | ||||
|     ) | ||||
|     # TODO, tuple form here? | ||||
|     async with chan._from_aio: | ||||
|         async with translate_aio_errors( | ||||
|             chan, | ||||
|  | @ -685,18 +849,21 @@ def run_as_asyncio_guest( | |||
|     # Uh, oh. | ||||
|     # | ||||
|     # :o | ||||
| 
 | ||||
|     # It looks like your event loop has caught a case of the ``trio``s. | ||||
| 
 | ||||
|     # :() | ||||
| 
 | ||||
|     # Don't worry, we've heard you'll barely notice. You might | ||||
|     # hallucinate a few more propagating errors and feel like your | ||||
|     # digestion has slowed but if anything get's too bad your parents | ||||
|     # will know about it. | ||||
| 
 | ||||
|     # | ||||
|     # looks like your stdlib event loop has caught a case of "the trios" ! | ||||
|     # | ||||
|     # :O | ||||
|     # | ||||
|     # Don't worry, we've heard you'll barely notice. | ||||
|     # | ||||
|     # :) | ||||
| 
 | ||||
|     # | ||||
|     # You might hallucinate a few more propagating errors and feel | ||||
|     # like your digestion has slowed, but if anything get's too bad | ||||
|     # your parents will know about it. | ||||
|     # | ||||
|     # B) | ||||
|     # | ||||
|     async def aio_main(trio_main): | ||||
|         ''' | ||||
|         Main `asyncio.Task` which calls | ||||
|  | @ -713,16 +880,20 @@ def run_as_asyncio_guest( | |||
|             '-> built a `trio`-done future\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: shoudn't this be done in the guest-run trio task? | ||||
|         # if debug_mode(): | ||||
|         #     # XXX make it obvi we know this isn't supported yet! | ||||
|         #     log.error( | ||||
|         #         'Attempting to enter unsupported `greenback` init ' | ||||
|         #         'from `asyncio` task..' | ||||
|         #     ) | ||||
|         #     await _debug.maybe_init_greenback( | ||||
|         #         force_reload=True, | ||||
|         #     ) | ||||
|         # TODO: is this evern run or needed? | ||||
|         # -[ ] pretty sure it never gets run for root-infected-aio | ||||
|         #     since this main task is always the parent of any | ||||
|         #     eventual `open_root_actor()` call? | ||||
|         if debug_mode(): | ||||
|             log.error( | ||||
|                 'Attempting to enter non-required `greenback` init ' | ||||
|                 'from `asyncio` task ???' | ||||
|             ) | ||||
|             # XXX make it obvi we know this isn't supported yet! | ||||
|             assert 0 | ||||
|             # await _debug.maybe_init_greenback( | ||||
|             #     force_reload=True, | ||||
|             # ) | ||||
| 
 | ||||
|         def trio_done_callback(main_outcome): | ||||
|             log.runtime( | ||||
|  | @ -732,6 +903,7 @@ def run_as_asyncio_guest( | |||
|             ) | ||||
| 
 | ||||
|             if isinstance(main_outcome, Error): | ||||
|                 # import pdbp; pdbp.set_trace() | ||||
|                 error: BaseException = main_outcome.error | ||||
| 
 | ||||
|                 # show an dedicated `asyncio`-side tb from the error | ||||
|  | @ -751,7 +923,7 @@ def run_as_asyncio_guest( | |||
|                 trio_done_fute.set_result(main_outcome) | ||||
| 
 | ||||
|             log.info( | ||||
|                 f'`trio` guest-run finished with outcome\n' | ||||
|                 f'`trio` guest-run finished with,\n' | ||||
|                 f')>\n' | ||||
|                 f'|_{trio_done_fute}\n' | ||||
|             ) | ||||
|  | @ -777,9 +949,20 @@ def run_as_asyncio_guest( | |||
|             done_callback=trio_done_callback, | ||||
|         ) | ||||
|         fute_err: BaseException|None = None | ||||
| 
 | ||||
|         try: | ||||
|             out: Outcome = await asyncio.shield(trio_done_fute) | ||||
|             # ^TODO still don't really understand why the `.shield()` | ||||
|             # is required ... ?? | ||||
|             # https://docs.python.org/3/library/asyncio-task.html#asyncio.shield | ||||
|             # ^ seems as though in combo with the try/except here | ||||
|             # we're BOLDLY INGORING cancel of the trio fute? | ||||
|             # | ||||
|             # I guess it makes sense bc we don't want `asyncio` to | ||||
|             # cancel trio just because they can't handle SIGINT | ||||
|             # sanely? XD .. kk | ||||
| 
 | ||||
|             # XXX, sin-shield causes guest-run abandons on SIGINT.. | ||||
|             # out: Outcome = await trio_done_fute | ||||
| 
 | ||||
|             # NOTE will raise (via `Error.unwrap()`) from any | ||||
|             # exception packed into the guest-run's `main_outcome`. | ||||
|  | @ -802,27 +985,32 @@ def run_as_asyncio_guest( | |||
|             fute_err = _fute_err | ||||
|             err_message: str = ( | ||||
|                 'main `asyncio` task ' | ||||
|                 'was cancelled!\n' | ||||
|             ) | ||||
|             if isinstance(fute_err, asyncio.CancelledError): | ||||
|                 err_message += 'was cancelled!\n' | ||||
|             else: | ||||
|                 err_message += f'errored with {out.error!r}\n' | ||||
| 
 | ||||
|             # TODO, handle possible edge cases with | ||||
|             # `open_root_actor()` closing before this is run! | ||||
|             # | ||||
|             actor: tractor.Actor = tractor.current_actor() | ||||
| 
 | ||||
|             log.exception( | ||||
|                 err_message | ||||
|                 + | ||||
|                 'Cancelling `trio`-side `tractor`-runtime..\n' | ||||
|                 f'c)>\n' | ||||
|                 f'c(>\n' | ||||
|                 f'  |_{actor}.cancel_soon()\n' | ||||
|             ) | ||||
| 
 | ||||
|             # XXX WARNING XXX the next LOCs are super important, since | ||||
|             # without them, we can get guest-run abandonment cases | ||||
|             # where `asyncio` will not schedule or wait on the `trio` | ||||
|             # guest-run task before final shutdown! This is | ||||
|             # particularly true if the `trio` side has tasks doing | ||||
|             # shielded work when a SIGINT condition occurs. | ||||
|             # XXX WARNING XXX the next LOCs are super important! | ||||
|             # | ||||
|             # SINCE without them, we can get guest-run ABANDONMENT | ||||
|             # cases where `asyncio` will not schedule or wait on the | ||||
|             # guest-run `trio.Task` nor invoke its registered | ||||
|             # `trio_done_callback()` before final shutdown! | ||||
|             # | ||||
|             # This is particularly true if the `trio` side has tasks | ||||
|             # in shielded sections when an OC-cancel (SIGINT) | ||||
|             # condition occurs! | ||||
|             # | ||||
|             # We now have the | ||||
|             # `test_infected_asyncio.test_sigint_closes_lifetime_stack()` | ||||
|  | @ -886,7 +1074,10 @@ def run_as_asyncio_guest( | |||
| 
 | ||||
|             try: | ||||
|                 return trio_done_fute.result() | ||||
|             except asyncio.exceptions.InvalidStateError as state_err: | ||||
|             except ( | ||||
|                 asyncio.InvalidStateError, | ||||
|                 # asyncio.CancelledError, | ||||
|             )as state_err: | ||||
| 
 | ||||
|                 # XXX be super dupere noisy about abandonment issues! | ||||
|                 aio_task: asyncio.Task = asyncio.current_task() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue