forked from goodboy/tractor
				
			Raise explicitly on missing `greenback` portal
When `.pause_from_sync()` is called from an `asyncio.Task` which was never bestowed a portal we want to be mega pedantic about it; indicate that the task was NOT spawned from our `.to_asyncio` API and likely by some out-of-our-control code (normally using `asyncio.ensure_future()/.create_task()`). Though `greenback` already errors on such usage, it's not always clear why no portal exists; explaining the situation of a 3rd-party-bg-spawned-task should avoid dev confusion for most cases. Impl deats, - distinguish between an actor in infected mode versus the actual caller of `.pause_from_sync()` being an `asyncio.Task` with more explicit `asyncio_task` and `is_infected_aio` vars. - ONLY in the case of being both an infected-mode-actor AND detecting that the caller is an `asyncio.Task`, check `greenback.has_portal()` such that when not bestowed we presume the aforementioned 3rd-party-bg-task case above and raise a new explicit RTE with a detailed explanatory message. - add some masked draft code for handling the speical case of a root actor `asyncio.Task` caller which could (in theory) not actually require gb portal use since the `Lock` can be acquired directly without IPC. |_this will likely require factoring of various pause machinery funcs into a `_pause_from_root_task()` to mk the impl sane XD Other, - expose a new `debug_filter: Callable` which can be provided by the caller of `_maybe_enter_pm()` to predicate whether to enter the debugger REPL based on the caught `BaseException|BaseExceptionGroup`; this is handy for customizing the meaning of "graceful cancellations" so as to avoid crash handling on expected egs of more then `trioCancelled`. |_ make the default as it was implemented: `not is_multi_cancelled(err)` - pass-through a new `ignore: set[BaseException]` as `open_crash_handler(ignore_nested=ignore)` to allow for the same silent-cancellation-egs-swallowing as desired from outside the actor runtime.remotes/1757153874605917753/main
							parent
							
								
									b71d96fdee
								
							
						
					
					
						commit
						79f4197d26
					
				|  | @ -75,6 +75,7 @@ from tractor import _state | |||
| from tractor._exceptions import ( | ||||
|     InternalError, | ||||
|     NoRuntime, | ||||
|     is_multi_cancelled, | ||||
| ) | ||||
| from tractor._state import ( | ||||
|     current_actor, | ||||
|  | @ -1743,7 +1744,7 @@ async def _pause( | |||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
|     **debug_func_kwargs, | ||||
| 
 | ||||
| ) -> tuple[PdbREPL, Task]|None: | ||||
| ) -> tuple[Task, PdbREPL]|None: | ||||
|     ''' | ||||
|     Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()` | ||||
|     stack frame when not shielded (since apparently i can't figure out | ||||
|  | @ -1929,7 +1930,7 @@ async def _pause( | |||
|                 ) | ||||
|                 with trio.CancelScope(shield=shield): | ||||
|                     await trio.lowlevel.checkpoint() | ||||
|                 return repl, task | ||||
|                 return (repl, task) | ||||
| 
 | ||||
|             # elif repl_task: | ||||
|             #     log.warning( | ||||
|  | @ -2530,26 +2531,17 @@ def pause_from_sync( | |||
|             f'{actor.uid} task called `tractor.pause_from_sync()`\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: once supported, remove this AND the one | ||||
|         # inside `._pause()`! | ||||
|         # outstanding impl fixes: | ||||
|         # -[ ] need to make `.shield_sigint()` below work here! | ||||
|         # -[ ] how to handle `asyncio`'s new SIGINT-handler | ||||
|         #     injection? | ||||
|         # -[ ] should `breakpoint()` work and what does it normally | ||||
|         #     do in `asyncio` ctxs? | ||||
|         # if actor.is_infected_aio(): | ||||
|         #     raise RuntimeError( | ||||
|         #         '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|         #         'for infected `asyncio` mode!' | ||||
|         #     ) | ||||
| 
 | ||||
|         repl: PdbREPL = mk_pdb() | ||||
| 
 | ||||
|         # message += f'-> created local REPL {repl}\n' | ||||
|         is_trio_thread: bool = DebugStatus.is_main_trio_thread() | ||||
|         is_root: bool = is_root_process() | ||||
|         is_aio: bool = actor.is_infected_aio() | ||||
|         is_infected_aio: bool = actor.is_infected_aio() | ||||
|         thread: Thread = threading.current_thread() | ||||
| 
 | ||||
|         asyncio_task: asyncio.Task|None = None | ||||
|         if is_infected_aio: | ||||
|             asyncio_task = asyncio.current_task() | ||||
| 
 | ||||
|         # TODO: we could also check for a non-`.to_thread` context | ||||
|         # using `trio.from_thread.check_cancelled()` (says | ||||
|  | @ -2565,24 +2557,18 @@ def pause_from_sync( | |||
|         if ( | ||||
|             not is_trio_thread | ||||
|             and | ||||
|             not is_aio  # see below for this usage | ||||
|             not asyncio_task | ||||
|         ): | ||||
|             # TODO: `threading.Lock()` this so we don't get races in | ||||
|             # multi-thr cases where they're acquiring/releasing the | ||||
|             # REPL and setting request/`Lock` state, etc.. | ||||
|             thread: threading.Thread = threading.current_thread() | ||||
|             repl_owner = thread | ||||
|             repl_owner: Thread = thread | ||||
| 
 | ||||
|             # TODO: make root-actor bg thread usage work! | ||||
|             if ( | ||||
|                 is_root | ||||
|                 # or | ||||
|                 # is_aio | ||||
|             ): | ||||
|                 if is_root: | ||||
|                     message += ( | ||||
|                         f'-> called from a root-actor bg {thread}\n' | ||||
|                     ) | ||||
|             if is_root: | ||||
|                 message += ( | ||||
|                     f'-> called from a root-actor bg {thread}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 message += ( | ||||
|                     '-> scheduling `._pause_from_bg_root_thread()`..\n' | ||||
|  | @ -2637,34 +2623,95 @@ def pause_from_sync( | |||
|                 DebugStatus.shield_sigint() | ||||
|                 assert bg_task is not DebugStatus.repl_task | ||||
| 
 | ||||
|         # TODO: once supported, remove this AND the one | ||||
|         # inside `._pause()`! | ||||
|         # outstanding impl fixes: | ||||
|         # -[ ] need to make `.shield_sigint()` below work here! | ||||
|         # -[ ] how to handle `asyncio`'s new SIGINT-handler | ||||
|         #     injection? | ||||
|         # -[ ] should `breakpoint()` work and what does it normally | ||||
|         #     do in `asyncio` ctxs? | ||||
|         # if actor.is_infected_aio(): | ||||
|         #     raise RuntimeError( | ||||
|         #         '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|         #         'for infected `asyncio` mode!' | ||||
|         #     ) | ||||
|         elif ( | ||||
|             not is_trio_thread | ||||
|             and | ||||
|             is_aio | ||||
|             is_infected_aio  # as in, the special actor-runtime mode | ||||
|             # ^NOTE XXX, that doesn't mean the caller is necessarily | ||||
|             # an `asyncio.Task` just that `trio` has been embedded on | ||||
|             # the `asyncio` event loop! | ||||
|             and | ||||
|             asyncio_task  # transitive caller is an actual `asyncio.Task` | ||||
|         ): | ||||
|             greenback: ModuleType = maybe_import_greenback() | ||||
|             repl_owner: Task = asyncio.current_task() | ||||
|             DebugStatus.shield_sigint() | ||||
|             fute: asyncio.Future = run_trio_task_in_future( | ||||
|                 partial( | ||||
|                     _pause, | ||||
|                     debug_func=None, | ||||
|                     repl=repl, | ||||
|                     hide_tb=hide_tb, | ||||
| 
 | ||||
|                     # XXX to prevent `._pause()` for setting | ||||
|                     # `DebugStatus.repl_task` to the gb task! | ||||
|                     called_from_sync=True, | ||||
|                     called_from_bg_thread=True, | ||||
|             if greenback.has_portal(): | ||||
|                 DebugStatus.shield_sigint() | ||||
|                 fute: asyncio.Future = run_trio_task_in_future( | ||||
|                     partial( | ||||
|                         _pause, | ||||
|                         debug_func=None, | ||||
|                         repl=repl, | ||||
|                         hide_tb=hide_tb, | ||||
| 
 | ||||
|                     **_pause_kwargs | ||||
|                         # XXX to prevent `._pause()` for setting | ||||
|                         # `DebugStatus.repl_task` to the gb task! | ||||
|                         called_from_sync=True, | ||||
|                         called_from_bg_thread=True, | ||||
| 
 | ||||
|                         **_pause_kwargs | ||||
|                     ) | ||||
|                 ) | ||||
|             ) | ||||
|                 repl_owner = asyncio_task | ||||
|                 bg_task, _ = greenback.await_(fute) | ||||
|                 # TODO: ASYNC version -> `.pause_from_aio()`? | ||||
|                 # bg_task, _ = await fute | ||||
| 
 | ||||
|             # TODO: for async version -> `.pause_from_aio()`? | ||||
|             # bg_task, _ = await fute | ||||
|             bg_task, _ = greenback.await_(fute) | ||||
|             bg_task: asyncio.Task = asyncio.current_task() | ||||
|             # handle the case where an `asyncio` task has been | ||||
|             # spawned WITHOUT enabling a `greenback` portal.. | ||||
|             # => can often happen in 3rd party libs. | ||||
|             else: | ||||
|                 bg_task = repl_owner | ||||
| 
 | ||||
|                 # TODO, ostensibly we can just acquire the | ||||
|                 # debug lock directly presuming we're the | ||||
|                 # root actor running in infected asyncio | ||||
|                 # mode? | ||||
|                 # | ||||
|                 # TODO, this would be a special case where | ||||
|                 # a `_pause_from_root()` would come in very | ||||
|                 # handy! | ||||
|                 # if is_root: | ||||
|                 #     import pdbp; pdbp.set_trace() | ||||
|                 #     log.warning( | ||||
|                 #         'Allowing `asyncio` task to acquire debug-lock in root-actor..\n' | ||||
|                 #         'This is not fully implemented yet; there may be teardown hangs!\n\n' | ||||
|                 #     ) | ||||
|                 # else: | ||||
| 
 | ||||
|                 # simply unsupported, since there exists no hack (i | ||||
|                 # can think of) to workaround this in a subactor | ||||
|                 # which needs to lock the root's REPL ow we're sure | ||||
|                 # to get prompt stdstreams clobbering.. | ||||
|                 cf_repr: str = '' | ||||
|                 if api_frame: | ||||
|                     caller_frame: FrameType = api_frame.f_back | ||||
|                     cf_repr: str = f'caller_frame: {caller_frame!r}\n' | ||||
| 
 | ||||
|                 raise RuntimeError( | ||||
|                     f"CAN'T USE `greenback._await()` without a portal !?\n\n" | ||||
|                     f'Likely this task was NOT spawned via the `tractor.to_asyncio` API..\n' | ||||
|                     f'{asyncio_task}\n' | ||||
|                     f'{cf_repr}\n' | ||||
| 
 | ||||
|                     f'Prolly the task was started out-of-band (from some lib?)\n' | ||||
|                     f'AND one of the below was never called ??\n' | ||||
|                     f'- greenback.ensure_portal()\n' | ||||
|                     f'- greenback.bestow_portal(<task>)\n' | ||||
|                 ) | ||||
| 
 | ||||
|         else:  # we are presumably the `trio.run()` + main thread | ||||
|             # raises on not-found by default | ||||
|  | @ -2915,8 +2962,14 @@ async def _maybe_enter_pm( | |||
|     tb: TracebackType|None = None, | ||||
|     api_frame: FrameType|None = None, | ||||
|     hide_tb: bool = False, | ||||
| 
 | ||||
|     # only enter debugger REPL when returns `True` | ||||
|     debug_filter: Callable[ | ||||
|         [BaseException|BaseExceptionGroup], | ||||
|         bool, | ||||
|     ] = lambda err: not is_multi_cancelled(err), | ||||
| 
 | ||||
| ): | ||||
|     from tractor._exceptions import is_multi_cancelled | ||||
|     if ( | ||||
|         debug_mode() | ||||
| 
 | ||||
|  | @ -2933,7 +2986,8 @@ async def _maybe_enter_pm( | |||
| 
 | ||||
|         # Really we just want to mostly avoid catching KBIs here so there | ||||
|         # might be a simpler check we can do? | ||||
|         and not is_multi_cancelled(err) | ||||
|         and | ||||
|         debug_filter(err) | ||||
|     ): | ||||
|         api_frame: FrameType = api_frame or inspect.currentframe() | ||||
|         tb: TracebackType = tb or sys.exc_info()[2] | ||||
|  | @ -3139,10 +3193,16 @@ def open_crash_handler( | |||
|     try: | ||||
|         yield | ||||
|     except tuple(catch) as err: | ||||
|         if type(err) not in ignore: | ||||
| 
 | ||||
|             # use our re-impl-ed version | ||||
|         if ( | ||||
|             type(err) not in ignore | ||||
|             and | ||||
|             not is_multi_cancelled( | ||||
|                 err, | ||||
|                 ignore_nested=ignore | ||||
|             ) | ||||
|         ): | ||||
|             try: | ||||
|                 # use our re-impl-ed version | ||||
|                 _post_mortem( | ||||
|                     repl=mk_pdb(), | ||||
|                     tb=sys.exc_info()[2], | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue