Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet da9bc1237d Change one infected-aio test to use `chan` in fn sig 2025-07-29 14:47:24 -04:00
Tyler Goodlet ab11ee4fbe Support `chan.started_nowait()` in `.open_channel_from()` target
That is the `target` can declare a `chan: LinkedTaskChannel` instead of
`to_trio`/`from_aio`.

To support it,
- change `.started()` -> the more appropriate `.started_nowait()` which
  can be called sync from the aio child task.
- adjust the `provide_channels` assert to accept either fn sig
  declaration (for now).

Still needs test(s) obvi..
2025-07-29 14:42:15 -04:00
Tyler Goodlet 466dce8aed Relay `asyncio` errors via EoC and raise from rent
Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.

Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
  conditions and in such cases set `chan._trio_to_raise = aio_err` such
  that the `trio`-parent-task always raises the child's exception
  directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
  `aio_err` priority if it's not already set as to `trio_to_raise`.

Also,
- hide the `_run_asyncio_task()` frame by def.
2025-07-29 14:30:42 -04:00
Tyler Goodlet 808dd9d73c Add "raises-pre-started" `open_channel_from()` test
Verifying that if any exc is raised pre `chan.send_nowait()` (our
currentlly shite version of a `chan.started()`) then that exc is indeed
raised through on the `trio`-parent task side. This case was reproduced
from a `piker.brokers.ib` issue with a similar embedded
`.trionics.maybe_open_context()` call.

Deats,
- call the suite `test_aio_side_raises_before_started`.
- mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)`
  with a `target=raise_before_started` which,
- simply sleeps then immediately raises a RTE.
- expect the RTE from the aio-child-side to propagate all the way up to
  the root-actor's task right up through the `trio.run()`.
2025-07-29 13:58:48 -04:00
Tyler Goodlet aef306465d Add `never_warn_on: dict` support to unmasker
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.

For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
  `trio.Cancelled`.

For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
2025-07-28 12:57:48 -04:00
Tyler Goodlet 7459a4127c Accept `tn` to `gather_contexts()/maybe_open_context()`
Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.

Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
  to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
  ensure the provided scope is not *too* child-ish (with what APIs `trio`
  gives us, see above), OW go with the old approach of using the actor's
  private service nursery.
  Also,
  * better report `trio.Cancelled` around the cache-miss `yield`
    cases and ensure we **never** unmask triggering key-errors.
  * report on any stale-state with the mutex in the `finally` block.
2025-07-27 13:55:11 -04:00
Tyler Goodlet fc77e6eca5 Suppress beg tbs from `collapse_eg()`
It was originally this way; I forgot to flip it back when discarding the
`except*` handler impl..

Specially handle the `exc.__cause__` case where we raise from any
detected underlying cause and OW `from None` to suppress the eg's tb.
2025-07-25 20:05:51 -04:00
Tyler Goodlet 26526b86c3 Facepalm, actually use `.log.cancel()`-level to report parent-side taskc.. 2025-07-25 19:03:21 -04:00
Tyler Goodlet d079675dd4 UDS: implicitly create `Address.bindspace: Path`
Since it's merely a local-file-sys subdirectory and there should be no
reason file creation conflicts with other bind spaces.

Also add 2 test suites to match,
- `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to
  verify the dir creation when DNE.
- `..test_uds_double_listen_raises_connerr` to ensure a double bind
  raises a `ConnectionError` from the src `OSError`.
2025-07-25 13:32:23 -04:00
Tyler Goodlet c2acc4f55c Rm `assert` from `Channel.from_addr()`, for UDS we re-created to extract the peer PID 2025-07-25 11:27:30 -04:00
9 changed files with 497 additions and 139 deletions

View File

@ -0,0 +1,113 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
bs_dir_str: str = '/run/user/1000/doggy'
bs_dir = Path(bs_dir_str)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -573,6 +573,8 @@ def test_basic_interloop_channel_stream(
fan_out: bool, fan_out: bool,
): ):
async def main(): async def main():
# TODO, figure out min timeout here!
with trio.fail_after(6):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
portal = await an.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
@ -1088,6 +1090,108 @@ def test_sigint_closes_lifetime_stack(
trio.run(main) trio.run(main)
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))
@tractor.context
async def caching_ep(
ctx: tractor.Context,
):
log = tractor.log.get_logger('caching_ep')
log.info('syncing via `ctx.started()`')
await ctx.started()
# XXX, allocate the `open_channel_from()` inside
# a `.trionics.maybe_open_context()`.
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': raise_before_started,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
key=tractor.current_actor().uid,
) as (cache_hit, (clients, chan)),
):
if cache_hit:
log.error(
'Re-using cached `.open_from_channel()` call!\n'
)
else:
log.info(
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
)
await trio.sleep_forever()
def test_aio_side_raises_before_started(
reg_addr: tuple[str, int],
debug_mode: bool,
loglevel: str,
):
'''
Simulates connection-err from `piker.brokers.ib.api`..
Ensure any error raised by child-`asyncio.Task` BEFORE
`chan.started()`
'''
# delay = 999 if debug_mode else 1
async def main():
with trio.fail_after(3):
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: tractor.Portal = await an.start_actor(
'lchan_cacher_that_raises_fast',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
caching_ep,
) as (ctx, first):
assert not first
with pytest.raises(
expected_exception=(RemoteActorError),
) as excinfo:
trio.run(main)
# ensure `asyncio.Task` exception is bubbled
# allll the way erp!!
rae = excinfo.value
assert rae.boxed_type is RuntimeError
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts:

View File

@ -2265,7 +2265,7 @@ async def open_context_from_portal(
# await debug.pause() # await debug.pause()
# log.cancel( # log.cancel(
match scope_err: match scope_err:
case trio.Cancelled: case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases

View File

@ -185,7 +185,9 @@ class Channel:
addr, addr,
**kwargs, **kwargs,
) )
assert transport.raddr == addr # XXX, for UDS *no!* since we recv the peer-pid and build out
# a new addr..
# assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods? # ?TODO, compact this into adapter level-methods?
@ -301,7 +303,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = True, hide_tb: bool = False,
) -> None: ) -> None:
''' '''

View File

@ -103,8 +103,6 @@ class UDSAddress(
self.filedir self.filedir
or or
self.def_bindspace self.def_bindspace
# or
# get_rt_dir()
) )
@property @property
@ -230,7 +228,14 @@ async def start_listener(
addr: UDSAddress, addr: UDSAddress,
**kwargs, **kwargs,
) -> SocketListener: ) -> SocketListener:
# sock = addr._sock = socket.socket( '''
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
sock = socket.socket( sock = socket.socket(
socket.AF_UNIX, socket.AF_UNIX,
socket.SOCK_STREAM socket.SOCK_STREAM
@ -241,7 +246,17 @@ async def start_listener(
f'|_{addr}\n' f'|_{addr}\n'
) )
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath bindpath: Path = addr.sockpath
if not (bs := addr.bindspace).is_dir():
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr( with _reraise_as_connerr(
src_excs=( src_excs=(
FileNotFoundError, FileNotFoundError,

View File

@ -130,6 +130,7 @@ class LinkedTaskChannel(
_trio_task: trio.Task _trio_task: trio.Task
_aio_task_complete: trio.Event _aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True _suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None _trio_err: BaseException|None = None
@ -208,10 +209,15 @@ class LinkedTaskChannel(
async def aclose(self) -> None: async def aclose(self) -> None:
await self._from_aio.aclose() await self._from_aio.aclose()
def started( # ?TODO? async version of this?
def started_nowait(
self, self,
val: Any = None, val: Any = None,
) -> None: ) -> None:
'''
Synchronize aio-sde with its trio-parent.
'''
self._aio_started_val = val self._aio_started_val = val
return self._to_trio.send_nowait(val) return self._to_trio.send_nowait(val)
@ -242,6 +248,7 @@ class LinkedTaskChannel(
# cycle on the trio side? # cycle on the trio side?
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
return await self._from_aio.receive() return await self._from_aio.receive()
except BaseException as err: except BaseException as err:
async with translate_aio_errors( async with translate_aio_errors(
chan=self, chan=self,
@ -319,7 +326,7 @@ def _run_asyncio_task(
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
hide_tb: bool = False, hide_tb: bool = True,
**kwargs, **kwargs,
) -> LinkedTaskChannel: ) -> LinkedTaskChannel:
@ -347,18 +354,6 @@ def _run_asyncio_task(
# value otherwise it would just return ;P # value otherwise it would just return ;P
assert qsize > 1 assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
trio_task: trio.Task = trio.lowlevel.current_task() trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope() trio_cs = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
@ -373,6 +368,25 @@ def _run_asyncio_task(
_suppress_graceful_exits=suppress_graceful_exits, _suppress_graceful_exits=suppress_graceful_exits,
) )
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
if 'chan' in args:
kwargs['chan'] = chan
if provide_channels:
assert (
'to_trio' in args
or
'chan' in args
)
coro = func(**kwargs)
async def wait_on_coro_final_result( async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel, to_trio: trio.MemorySendChannel,
coro: Awaitable, coro: Awaitable,
@ -445,9 +459,15 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n' f'Task exited with final result: {result!r}\n'
) )
# only close the sender side which will relay # only close the aio (child) side which will relay
# a `trio.EndOfChannel` to the trio (consumer) side. # a `trio.EndOfChannel` to the trio (parent) side.
#
# XXX NOTE, that trio-side MUST then in such cases
# check for a `chan._aio_err` and raise it!!
to_trio.close() to_trio.close()
# specially mark the closure as due to the
# asyncio.Task terminating!
chan._closed_by_aio_task = True
aio_task_complete.set() aio_task_complete.set()
log.runtime( log.runtime(
@ -645,8 +665,9 @@ def _run_asyncio_task(
not trio_cs.cancel_called not trio_cs.cancel_called
): ):
log.cancel( log.cancel(
f'Cancelling `trio` side due to aio-side src exc\n' f'Cancelling trio-side due to aio-side src exc\n'
f'{curr_aio_err}\n' f'\n'
f'{curr_aio_err!r}\n'
f'\n' f'\n'
f'(c>\n' f'(c>\n'
f' |_{trio_task}\n' f' |_{trio_task}\n'
@ -758,6 +779,7 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -789,12 +811,50 @@ async def translate_aio_errors(
# ) # )
# raise # raise
# XXX always passthrough EoC since this translator is often # XXX EoC is a special SIGNAL from the aio-side here!
# called from `LinkedTaskChannel.receive()` which we want # There are 2 cases to handle:
# passthrough and further we have no special meaning for it in # 1. the "EoC passthrough" case.
# terms of relaying errors or signals from the aio side! # - the aio-task actually closed the channel "gracefully" and
except trio.EndOfChannel as eoc: # the trio-task should unwind any ongoing channel
# iteration/receiving,
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc trio_err = chan._trio_err = eoc
#
# await tractor.pause(shield=True)
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
raise eoc raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1047,7 +1107,7 @@ async def translate_aio_errors(
# #
if wait_on_aio_task: if wait_on_aio_task:
await chan._aio_task_complete.wait() await chan._aio_task_complete.wait()
log.info( log.debug(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
@ -1064,11 +1124,17 @@ async def translate_aio_errors(
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
AsyncioTaskExited| AsyncioTaskExited|
Exception| # relayed from aio-task
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits: if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from raise_from
if trio_to_raise: if trio_to_raise:
match ( match (
@ -1101,7 +1167,7 @@ async def translate_aio_errors(
) )
return return
case _: case _:
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from raise_from
# Check if the asyncio-side is the cause of the trio-side # Check if the asyncio-side is the cause of the trio-side
# error. # error.
@ -1167,7 +1233,6 @@ async def run_task(
@acm @acm
async def open_channel_from( async def open_channel_from(
target: Callable[..., Any], target: Callable[..., Any],
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
**target_kwargs, **target_kwargs,
@ -1201,7 +1266,6 @@ async def open_channel_from(
# deliver stream handle upward # deliver stream handle upward
yield first, chan yield first, chan
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called: if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled): if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel( log.cancel(

View File

@ -97,11 +97,11 @@ def get_collapsed_eg(
async def collapse_eg( async def collapse_eg(
hide_tb: bool = True, hide_tb: bool = True,
# XXX, for ex. will always show begs containing single taskc
ignore: set[Type[BaseException]] = { ignore: set[Type[BaseException]] = {
# trio.Cancelled, # trio.Cancelled,
}, },
add_notes: bool = True, add_notes: bool = True,
bp: bool = False,
): ):
''' '''
If `BaseExceptionGroup` raised in the body scope is If `BaseExceptionGroup` raised in the body scope is
@ -115,30 +115,6 @@ async def collapse_eg(
yield yield
except BaseExceptionGroup as _beg: except BaseExceptionGroup as _beg:
beg = _beg beg = _beg
# TODO, remove this rant..
#
# except* BaseException as beg:
# ^XXX WOW.. good job cpython. ^
# like, never ever EVER use this!! XD
#
# turns out rasing from an `except*`-block has the opposite
# behaviour of normal `except` and further can *never* be used to
# get the equiv of,
# `trio.open_nursery(strict_exception_groups=False)`
#
# ------ docs ------
# https://docs.python.org/3/reference/compound_stmts.html#except-star
#
# > Any remaining exceptions that were not handled by any
# > except* clause are re-raised at the end, along with all
# > exceptions that were raised from within the except*
# > clauses. If this list contains more than one exception to
# > reraise, they are combined into an exception group.
if bp:
from tractor.devx import pause
await pause(shield=True)
if ( if (
(exc := get_collapsed_eg(beg)) (exc := get_collapsed_eg(beg))
and and
@ -158,16 +134,17 @@ async def collapse_eg(
): ):
exc.add_note(from_group_note) exc.add_note(from_group_note)
raise exc # raise exc
# ^^ this will leave the orig beg tb above with the
# ?TODO? not needed right? # "during the handling of <beg> the following.."
# if cause := exc.__cause__: # So, instead do..
# raise exc# from cause #
# else: if cause := exc.__cause__:
# # raise exc from beg raise exc from cause
# # suppress "during handling of <the beg>" else:
# # output in tb/console. # suppress "during handling of <the beg>"
# raise exc from None # output in tb/console.
raise exc from None
# keep original # keep original
raise # beg raise # beg

View File

@ -41,6 +41,9 @@ import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg # from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -106,6 +109,9 @@ async def _enter_and_wait(
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[ tuple[
T | None, T | None,
@ -148,17 +154,22 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n' '\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence-type intead!\n' 'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
) )
try:
async with ( async with (
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(), # collapse_eg(),
trio.open_nursery( maybe_open_nursery(
strict_exception_groups=False, nursery=tn,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn, ) as tn,
# maybe_raise_from_masking_exc(),
): ):
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(
@ -170,11 +181,12 @@ async def gather_contexts(
seed, seed,
) )
# deliver control once all managers have started up # deliver control to caller once all ctx-managers have
# started (yielded back to us).
await all_entered.wait() await all_entered.wait()
try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
parent_exit.set()
finally: finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug: # the following wacky bug:
@ -233,6 +245,9 @@ async def maybe_open_context(
kwargs: dict = {}, kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None, key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open an async-context-manager (acm) if there is not already Maybe open an async-context-manager (acm) if there is not already
@ -265,6 +280,22 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is # have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our # currently difficult to implement any other way besides using our
# pre-allocated runtime instance..) # pre-allocated runtime instance..)
if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_n = tn
else:
service_n: trio.Nursery = current_actor()._service_n service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate # TODO: is there any way to allocate
@ -273,13 +304,18 @@ async def maybe_open_context(
# async with maybe_open_nursery(_Cache.service_n) as service_n: # async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n # _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try: try:
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource. # may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key] yielded = _Cache.values[ctx_key]
except KeyError: except KeyError as _ke:
# XXX, stay mutexed up to cache-miss yield
try:
cache_miss_ke = _ke
log.debug( log.debug(
f'Allocating new @acm-func entry\n' f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n' f'ctx_key={ctx_key}\n'
@ -289,23 +325,12 @@ async def maybe_open_context(
resources = _Cache.resources resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event()) resources[ctx_key] = (service_n, trio.Event())
# sync up to the mngr's yielded value
try:
yielded: Any = await service_n.start( yielded: Any = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,
) )
_Cache.users += 1 _Cache.users += 1
except trio.Cancelled as taskc:
log.cancel(
f'Cancelled during caching?\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
raise taskc
finally: finally:
# XXX, since this runs from an `except` it's a checkpoint # XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked. # whih can be `trio.Cancelled`-masked.
@ -318,10 +343,27 @@ async def maybe_open_context(
# SO just always unlock! # SO just always unlock!
lock.release() lock.release()
try:
yield ( yield (
False, # cache_hit = "no" False, # cache_hit = "no"
yielded, yielded,
) )
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else: else:
_Cache.users += 1 _Cache.users += 1
@ -341,6 +383,13 @@ async def maybe_open_context(
) )
finally: finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1 _Cache.users -= 1
if yielded is not None: if yielded is not None:

View File

@ -22,7 +22,10 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import TYPE_CHECKING from typing import (
Type,
TYPE_CHECKING,
)
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
@ -80,9 +83,19 @@ async def maybe_raise_from_masking_exc(
# ^TODO? other cases? # ^TODO? other cases?
), ),
always_warn_on: tuple[BaseException] = ( always_warn_on: tuple[Type[BaseException]] = (
trio.Cancelled, trio.Cancelled,
), ),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely # ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc # there will be no operational diff since the exc
# is always expected to be consumed. # is always expected to be consumed.
@ -144,7 +157,10 @@ async def maybe_raise_from_masking_exc(
maybe_masker=exc_match, maybe_masker=exc_match,
unmask_from=set(unmask_from), unmask_from=set(unmask_from),
): ):
masked.append((exc_ctx, exc_match)) masked.append((
exc_ctx,
exc_match,
))
boxed_maybe_exc.value = exc_match boxed_maybe_exc.value = exc_match
note: str = ( note: str = (
f'\n' f'\n'
@ -156,18 +172,36 @@ async def maybe_raise_from_masking_exc(
f'\n' f'\n'
f'{extra_note}\n' f'{extra_note}\n'
) )
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note) exc_ctx.add_note(note)
if type(exc_match) in always_warn_on: if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note) log.warning(note)
if raise_unmasked: if (
do_warn
and
raise_unmasked
):
if len(masked) < 2: if len(masked) < 2:
raise exc_ctx from exc_match raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else: # else:
# # ?TODO, see above but, possibly unmasking sub-exc
# # entries if there are > 1
# await pause(shield=True) # await pause(shield=True)
else: else:
raise raise