diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 258ba515..09f0781c 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -48,7 +48,7 @@ from tractor._state import ( _runtime_vars, ) from tractor._context import Unresolved -from tractor.devx import debug +from tractor import devx from tractor.log import ( get_logger, StackLevelAdapter, @@ -439,7 +439,23 @@ def _run_asyncio_task( orig = result = id(coro) try: + # XXX TODO UGH! + # this seems to break a `test_sync_pause_from_aio_task` + # in a REALLY weird way where a `dict` value for + # `_runtime_vars['_root_addrs']` is delivered from the + # parent actor?? + # + # XXX => see masked `.set_trace()` block in + # `Actor.from_parent()`.. + # + # with devx.maybe_open_crash_handler( + # # XXX, if trio-side exits (intentionally) we + # # shouldn't care bc it should have its own crash + # # handling logic. + # ignore={TrioTaskExited,}, + # ) as _bxerr: result: Any = await coro + chan._aio_result = result except BaseException as aio_err: chan._aio_err = aio_err @@ -546,7 +562,7 @@ def _run_asyncio_task( if ( debug_mode() and - (greenback := debug.maybe_import_greenback( + (greenback := devx.debug.maybe_import_greenback( force_reload=True, raise_not_found=False, )) @@ -946,7 +962,11 @@ async def translate_aio_errors( except BaseException as _trio_err: trio_err = chan._trio_err = _trio_err # await tractor.pause(shield=True) # workx! - entered: bool = await debug._maybe_enter_pm( + + # !TODO! we need an inter-loop lock here to avoid aio-tasks + # clobbering trio ones when both crash in debug-mode! + # + entered: bool = await devx.debug._maybe_enter_pm( trio_err, api_frame=inspect.currentframe(), ) @@ -1280,10 +1300,17 @@ async def open_channel_from( suppress_graceful_exits: bool = True, **target_kwargs, -) -> AsyncIterator[Any]: +) -> AsyncIterator[ + tuple[LinkedTaskChannel, Any] +]: ''' - Open an inter-loop linked task channel for streaming between a target - spawned ``asyncio`` task and ``trio``. + Start an `asyncio.Task` as `target()` and open an inter-loop + (linked) channel for streaming between it and the current + `trio.Task`. + + A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller + where the 2nd element is the value provided by the + `asyncio.Task`'s unblocking call to `chan.started_nowait()`. ''' chan: LinkedTaskChannel = _run_asyncio_task( @@ -1308,6 +1335,7 @@ async def open_channel_from( # deliver stream handle upward yield first, chan + # ^TODO! swap these!! except trio.Cancelled as taskc: if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): @@ -1338,7 +1366,8 @@ async def open_channel_from( ) else: # XXX SHOULD NEVER HAPPEN! - await tractor.pause() + log.error("SHOULD NEVER GET HERE !?!?") + await tractor.pause(shield=True) else: chan._to_trio.close()