Tried out an alt approach for `.to_asyncio` crashes
This change is masked out now BUT i'm leaving it in for reference.
I was debugging a multi-actor fault where the primary source actor was
an infected-aio-subactor (`brokerd.ib`) and it seemed like the REPL was only
entering on the `trio` side (at a `.open_channel_from()`) and not
eventually breaking in the `asyncio.Task`. But, since (changing
something?) it seems to be working now, it's just that the `trio` side
seems to sometimes handle before the (source/causing and more
child-ish) `asyncio`-task, which is a bit odd and not expected..
We could likely refine (maybe with an inter-loop-task REPL lock?) this
at some point and ensure a child-`asyncio` task which errors always
grabs the REPL **first**?
Lowlevel deats/further-todos,
- add (masked) `maybe_open_crash_handler()` block around
`asyncio.Task` execution with notes about weird parent-addr
delivery bug in `test_sync_pause_from_aio_task`
* yeah dunno what that's about but made a bug; seems to be IPC
serialization of the `TCPAddress` struct somewhere??
- add inter-loop lock TODO for avoiding aio-task clobbering
trio-tasks when both crash in debug-mode
Also,
- change import from `tractor.devx.debug` to `tractor.devx`
- adjust `get_logger()` call to use new implicit mod-name detection
added to `.log.get_logger()`, i.e. sin `name=__name__`.
- some teensie refinements to `open_channel_from()`:
* swap return type annotation for to `tuple[LinkedTaskChannel, Any]`
(was `Any`).
* update doc-string to clarify started-value delivery
* add err-log before `.pause()` in what should be an unreachable path.
* add todo to swap the `(first, chan)` pair to match that of ctx..
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
to_asyncio_channel_iface
parent
b5fd2a40b1
commit
2616f4b976
|
|
@ -48,7 +48,7 @@ from tractor._state import (
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor._context import Unresolved
|
from tractor._context import Unresolved
|
||||||
from tractor.devx import debug
|
from tractor import devx
|
||||||
from tractor.log import (
|
from tractor.log import (
|
||||||
get_logger,
|
get_logger,
|
||||||
StackLevelAdapter,
|
StackLevelAdapter,
|
||||||
|
|
@ -439,7 +439,23 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
orig = result = id(coro)
|
orig = result = id(coro)
|
||||||
try:
|
try:
|
||||||
|
# XXX TODO UGH!
|
||||||
|
# this seems to break a `test_sync_pause_from_aio_task`
|
||||||
|
# in a REALLY weird way where a `dict` value for
|
||||||
|
# `_runtime_vars['_root_addrs']` is delivered from the
|
||||||
|
# parent actor??
|
||||||
|
#
|
||||||
|
# XXX => see masked `.set_trace()` block in
|
||||||
|
# `Actor.from_parent()`..
|
||||||
|
#
|
||||||
|
# with devx.maybe_open_crash_handler(
|
||||||
|
# # XXX, if trio-side exits (intentionally) we
|
||||||
|
# # shouldn't care bc it should have its own crash
|
||||||
|
# # handling logic.
|
||||||
|
# ignore={TrioTaskExited,},
|
||||||
|
# ) as _bxerr:
|
||||||
result: Any = await coro
|
result: Any = await coro
|
||||||
|
|
||||||
chan._aio_result = result
|
chan._aio_result = result
|
||||||
except BaseException as aio_err:
|
except BaseException as aio_err:
|
||||||
chan._aio_err = aio_err
|
chan._aio_err = aio_err
|
||||||
|
|
@ -546,7 +562,7 @@ def _run_asyncio_task(
|
||||||
if (
|
if (
|
||||||
debug_mode()
|
debug_mode()
|
||||||
and
|
and
|
||||||
(greenback := debug.maybe_import_greenback(
|
(greenback := devx.debug.maybe_import_greenback(
|
||||||
force_reload=True,
|
force_reload=True,
|
||||||
raise_not_found=False,
|
raise_not_found=False,
|
||||||
))
|
))
|
||||||
|
|
@ -946,7 +962,11 @@ async def translate_aio_errors(
|
||||||
except BaseException as _trio_err:
|
except BaseException as _trio_err:
|
||||||
trio_err = chan._trio_err = _trio_err
|
trio_err = chan._trio_err = _trio_err
|
||||||
# await tractor.pause(shield=True) # workx!
|
# await tractor.pause(shield=True) # workx!
|
||||||
entered: bool = await debug._maybe_enter_pm(
|
|
||||||
|
# !TODO! we need an inter-loop lock here to avoid aio-tasks
|
||||||
|
# clobbering trio ones when both crash in debug-mode!
|
||||||
|
#
|
||||||
|
entered: bool = await devx.debug._maybe_enter_pm(
|
||||||
trio_err,
|
trio_err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
|
|
@ -1280,10 +1300,17 @@ async def open_channel_from(
|
||||||
suppress_graceful_exits: bool = True,
|
suppress_graceful_exits: bool = True,
|
||||||
**target_kwargs,
|
**target_kwargs,
|
||||||
|
|
||||||
) -> AsyncIterator[Any]:
|
) -> AsyncIterator[
|
||||||
|
tuple[LinkedTaskChannel, Any]
|
||||||
|
]:
|
||||||
'''
|
'''
|
||||||
Open an inter-loop linked task channel for streaming between a target
|
Start an `asyncio.Task` as `target()` and open an inter-loop
|
||||||
spawned ``asyncio`` task and ``trio``.
|
(linked) channel for streaming between it and the current
|
||||||
|
`trio.Task`.
|
||||||
|
|
||||||
|
A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller
|
||||||
|
where the 2nd element is the value provided by the
|
||||||
|
`asyncio.Task`'s unblocking call to `chan.started_nowait()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
chan: LinkedTaskChannel = _run_asyncio_task(
|
chan: LinkedTaskChannel = _run_asyncio_task(
|
||||||
|
|
@ -1308,6 +1335,7 @@ async def open_channel_from(
|
||||||
|
|
||||||
# deliver stream handle upward
|
# deliver stream handle upward
|
||||||
yield first, chan
|
yield first, chan
|
||||||
|
# ^TODO! swap these!!
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
||||||
|
|
@ -1338,7 +1366,8 @@ async def open_channel_from(
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# XXX SHOULD NEVER HAPPEN!
|
# XXX SHOULD NEVER HAPPEN!
|
||||||
await tractor.pause()
|
log.error("SHOULD NEVER GET HERE !?!?")
|
||||||
|
await tractor.pause(shield=True)
|
||||||
else:
|
else:
|
||||||
chan._to_trio.close()
|
chan._to_trio.close()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue