Compare commits

..

No commits in common. "71562e0af7e9d13f04f8069640307b42557cdfe3" and "cd1761e44db7fca60ea34020e12de9a038024a22" have entirely different histories.

5 changed files with 58 additions and 102 deletions

View File

@ -117,9 +117,7 @@ async def open_actor_local_nursery(
ctx: tractor.Context, ctx: tractor.Context,
): ):
global _nursery global _nursery
async with trio.open_nursery( async with trio.open_nursery() as n:
strict_exception_groups=False,
) as n:
_nursery = n _nursery = n
await ctx.started() await ctx.started()
await trio.sleep(10) await trio.sleep(10)

View File

@ -533,10 +533,6 @@ async def open_portal(
async with maybe_open_nursery( async with maybe_open_nursery(
tn, tn,
shield=shield, shield=shield,
strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn: ) as tn:
if not channel.connected(): if not channel.connected():

View File

@ -317,6 +317,8 @@ class Lock:
we_released: bool = False we_released: bool = False
ctx_in_debug: Context|None = cls.ctx_in_debug ctx_in_debug: Context|None = cls.ctx_in_debug
repl_task: Task|Thread|None = DebugStatus.repl_task repl_task: Task|Thread|None = DebugStatus.repl_task
message: str = ''
try: try:
if not DebugStatus.is_main_trio_thread(): if not DebugStatus.is_main_trio_thread():
thread: threading.Thread = threading.current_thread() thread: threading.Thread = threading.current_thread()
@ -331,10 +333,6 @@ class Lock:
return False return False
task: Task = current_task() task: Task = current_task()
message: str = (
'TTY NOT RELEASED on behalf of caller\n'
f'|_{task}\n'
)
# sanity check that if we're the root actor # sanity check that if we're the root actor
# the lock is marked as such. # the lock is marked as such.
@ -349,6 +347,11 @@ class Lock:
else: else:
assert DebugStatus.repl_task is not task assert DebugStatus.repl_task is not task
message: str = (
'TTY lock was NOT released on behalf of caller\n'
f'|_{task}\n'
)
lock: trio.StrictFIFOLock = cls._debug_lock lock: trio.StrictFIFOLock = cls._debug_lock
owner: Task = lock.statistics().owner owner: Task = lock.statistics().owner
if ( if (
@ -363,21 +366,23 @@ class Lock:
# correct task, greenback-spawned-task and/or thread # correct task, greenback-spawned-task and/or thread
# being set to the `.repl_task` such that the above # being set to the `.repl_task` such that the above
# condition matches and we actually release the lock. # condition matches and we actually release the lock.
#
# This is particular of note from `.pause_from_sync()`! # This is particular of note from `.pause_from_sync()`!
): ):
cls._debug_lock.release() cls._debug_lock.release()
we_released: bool = True we_released: bool = True
if repl_task: if repl_task:
message: str = ( message: str = (
'TTY released on behalf of root-actor-local REPL owner\n' 'Lock released on behalf of root-actor-local REPL owner\n'
f'|_{repl_task}\n' f'|_{repl_task}\n'
) )
else: else:
message: str = ( message: str = (
'TTY released by us on behalf of remote peer?\n' 'TTY lock released by us on behalf of remote peer?\n'
f'{ctx_in_debug}\n' f'|_ctx_in_debug: {ctx_in_debug}\n\n'
) )
# mk_pdb().set_trace()
# elif owner:
except RuntimeError as rte: except RuntimeError as rte:
log.exception( log.exception(
@ -395,8 +400,7 @@ class Lock:
req_handler_finished: trio.Event|None = Lock.req_handler_finished req_handler_finished: trio.Event|None = Lock.req_handler_finished
if ( if (
not lock_stats.owner not lock_stats.owner
and and req_handler_finished is None
req_handler_finished is None
): ):
message += ( message += (
'-> No new task holds the TTY lock!\n\n' '-> No new task holds the TTY lock!\n\n'
@ -414,8 +418,8 @@ class Lock:
repl_task repl_task
) )
message += ( message += (
f'A non-caller task still owns this lock on behalf of\n' f'A non-caller task still owns this lock on behalf of '
f'{behalf_of_task}\n' f'`{behalf_of_task}`\n'
f'lock owner task: {lock_stats.owner}\n' f'lock owner task: {lock_stats.owner}\n'
) )
@ -443,6 +447,8 @@ class Lock:
if message: if message:
log.devx(message) log.devx(message)
else:
import pdbp; pdbp.set_trace()
return we_released return we_released
@ -662,11 +668,10 @@ async def lock_stdio_for_peer(
fail_reason: str = ( fail_reason: str = (
f'on behalf of peer\n\n' f'on behalf of peer\n\n'
f'x)<=\n' f'x)<=\n'
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n' f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
f'\n'
'Forcing `Lock.release()` due to acquire failure!\n\n' 'Forcing `Lock.release()` due to acquire failure!\n\n'
f'x)=>\n' f'x)=> {ctx}\n'
f' {ctx}'
) )
if isinstance(req_err, trio.Cancelled): if isinstance(req_err, trio.Cancelled):
fail_reason = ( fail_reason = (
@ -1174,7 +1179,7 @@ async def request_root_stdio_lock(
log.devx( log.devx(
'Initing stdio-lock request task with root actor' 'Initing stdio-lock request task with root actor'
) )
# TODO: can we implement this mutex more generally as # TODO: likely we can implement this mutex more generally as
# a `._sync.Lock`? # a `._sync.Lock`?
# -[ ] simply add the wrapping needed for the debugger specifics? # -[ ] simply add the wrapping needed for the debugger specifics?
# - the `__pld_spec__` impl and maybe better APIs for the client # - the `__pld_spec__` impl and maybe better APIs for the client
@ -1185,7 +1190,6 @@ async def request_root_stdio_lock(
# - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock # - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock
DebugStatus.req_finished = trio.Event() DebugStatus.req_finished = trio.Event()
DebugStatus.req_task = current_task() DebugStatus.req_task = current_task()
req_err: BaseException|None = None
try: try:
from tractor._discovery import get_root from tractor._discovery import get_root
# NOTE: we need this to ensure that this task exits # NOTE: we need this to ensure that this task exits
@ -1208,7 +1212,6 @@ async def request_root_stdio_lock(
# ) # )
DebugStatus.req_cs = req_cs DebugStatus.req_cs = req_cs
req_ctx: Context|None = None req_ctx: Context|None = None
ctx_eg: BaseExceptionGroup|None = None
try: try:
# TODO: merge into single async with ? # TODO: merge into single async with ?
async with get_root() as portal: async with get_root() as portal:
@ -1239,12 +1242,7 @@ async def request_root_stdio_lock(
) )
# try: # try:
if (locker := status.subactor_uid) != actor_uid: assert status.subactor_uid == actor_uid
raise DebugStateError(
f'Root actor locked by another peer !?\n'
f'locker: {locker!r}\n'
f'actor_uid: {actor_uid}\n'
)
assert status.cid assert status.cid
# except AttributeError: # except AttributeError:
# log.exception('failed pldspec asserts!') # log.exception('failed pldspec asserts!')
@ -1281,11 +1279,10 @@ async def request_root_stdio_lock(
f'Exitting {req_ctx.side!r}-side of locking req_ctx\n' f'Exitting {req_ctx.side!r}-side of locking req_ctx\n'
) )
except* ( except (
tractor.ContextCancelled, tractor.ContextCancelled,
trio.Cancelled, trio.Cancelled,
) as _taskc_eg: ):
ctx_eg = _taskc_eg
log.cancel( log.cancel(
'Debug lock request was CANCELLED?\n\n' 'Debug lock request was CANCELLED?\n\n'
f'<=c) {req_ctx}\n' f'<=c) {req_ctx}\n'
@ -1294,23 +1291,21 @@ async def request_root_stdio_lock(
) )
raise raise
except* ( except (
BaseException, BaseException,
) as _ctx_eg: ) as ctx_err:
ctx_eg = _ctx_eg
message: str = ( message: str = (
'Failed during debug request dialog with root actor?\n' 'Failed during debug request dialog with root actor?\n\n'
) )
if (req_ctx := DebugStatus.req_ctx): if (req_ctx := DebugStatus.req_ctx):
message += ( message += (
f'<=x)\n' f'<=x) {req_ctx}\n\n'
f' |_{req_ctx}\n'
f'Cancelling IPC ctx!\n' f'Cancelling IPC ctx!\n'
) )
try: try:
await req_ctx.cancel() await req_ctx.cancel()
except trio.ClosedResourceError as terr: except trio.ClosedResourceError as terr:
ctx_eg.add_note( ctx_err.add_note(
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` ' # f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} ' f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
) )
@ -1319,45 +1314,21 @@ async def request_root_stdio_lock(
message += 'Failed in `Portal.open_context()` call ??\n' message += 'Failed in `Portal.open_context()` call ??\n'
log.exception(message) log.exception(message)
ctx_eg.add_note(message) ctx_err.add_note(message)
raise ctx_eg raise ctx_err
except BaseException as _req_err: except (
req_err = _req_err
# XXX NOTE, since new `trio` enforces strict egs by default
# we have to always handle the eg explicitly given the
# `Portal.open_context()` call above (which implicitly opens
# a nursery).
match req_err:
case BaseExceptionGroup():
# for an eg of just one taskc, just unpack and raise
# since we want to propagate a plane ol' `Cancelled`
# up from the `.pause()` call.
excs: list[BaseException] = req_err.exceptions
if (
len(excs) == 1
and
type(exc := excs[0]) in (
tractor.ContextCancelled, tractor.ContextCancelled,
trio.Cancelled, trio.Cancelled,
)
): ):
log.cancel( log.cancel(
'Debug lock request CANCELLED?\n' 'Debug lock request CANCELLED?\n'
f'{req_ctx}\n' f'{req_ctx}\n'
) )
raise exc raise
case (
tractor.ContextCancelled(),
trio.Cancelled(),
):
log.cancel(
'Debug lock request CANCELLED?\n'
f'{req_ctx}\n'
)
raise exc
except BaseException as req_err:
# log.error('Failed to request root stdio-lock?')
DebugStatus.req_err = req_err DebugStatus.req_err = req_err
DebugStatus.release() DebugStatus.release()
@ -1372,7 +1343,7 @@ async def request_root_stdio_lock(
'Failed during stdio-locking dialog from root actor\n\n' 'Failed during stdio-locking dialog from root actor\n\n'
f'<=x)\n' f'<=x)\n'
f' |_{DebugStatus.req_ctx}\n' f'|_{DebugStatus.req_ctx}\n'
) from req_err ) from req_err
finally: finally:
@ -1435,7 +1406,7 @@ def any_connected_locker_child() -> bool:
actor: Actor = current_actor() actor: Actor = current_actor()
if not is_root_process(): if not is_root_process():
raise InternalError('This is a root-actor only API!') raise RuntimeError('This is a root-actor only API!')
if ( if (
(ctx := Lock.ctx_in_debug) (ctx := Lock.ctx_in_debug)
@ -2172,12 +2143,11 @@ async def _pause(
# `_enter_repl_sync()` into a common @cm? # `_enter_repl_sync()` into a common @cm?
except BaseException as _pause_err: except BaseException as _pause_err:
pause_err: BaseException = _pause_err pause_err: BaseException = _pause_err
_repl_fail_report: str|None = _repl_fail_msg
if isinstance(pause_err, bdb.BdbQuit): if isinstance(pause_err, bdb.BdbQuit):
log.devx( log.devx(
'REPL for pdb was explicitly quit!\n' 'REPL for pdb was explicitly quit!\n'
) )
_repl_fail_report = None _repl_fail_msg = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
@ -2197,16 +2167,16 @@ async def _pause(
return return
elif isinstance(pause_err, trio.Cancelled): elif isinstance(pause_err, trio.Cancelled):
_repl_fail_report += ( _repl_fail_msg = (
'You called `tractor.pause()` from an already cancelled scope!\n\n' 'You called `tractor.pause()` from an already cancelled scope!\n\n'
'Consider `await tractor.pause(shield=True)` to make it work B)\n' 'Consider `await tractor.pause(shield=True)` to make it work B)\n'
) )
else: else:
_repl_fail_report += f'on behalf of {repl_task} ??\n' _repl_fail_msg += f'on behalf of {repl_task} ??\n'
if _repl_fail_report: if _repl_fail_msg:
log.exception(_repl_fail_report) log.exception(_repl_fail_msg)
if not actor.is_infected_aio(): if not actor.is_infected_aio():
DebugStatus.release(cancel_req_task=True) DebugStatus.release(cancel_req_task=True)
@ -3081,8 +3051,7 @@ async def maybe_wait_for_debugger(
if ( if (
not debug_mode() not debug_mode()
and and not child_in_debug
not child_in_debug
): ):
return False return False
@ -3140,7 +3109,7 @@ async def maybe_wait_for_debugger(
logmeth( logmeth(
msg msg
+ +
'\n^^ Root is waiting on tty lock release.. ^^\n' '\nRoot is waiting on tty lock to release from\n\n'
# f'{caller_frame_info}\n' # f'{caller_frame_info}\n'
) )
@ -3203,11 +3172,11 @@ async def maybe_wait_for_debugger(
@cm @cm
def open_crash_handler( def open_crash_handler(
catch: set[BaseException] = { catch: set[BaseException] = {
# Exception,
BaseException, BaseException,
}, },
ignore: set[BaseException] = { ignore: set[BaseException] = {
KeyboardInterrupt, KeyboardInterrupt,
trio.Cancelled,
}, },
tb_hide: bool = True, tb_hide: bool = True,
): ):

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
`tokio` style broadcast channel. ``tokio`` style broadcast channel.
https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
''' '''

View File

@ -57,8 +57,6 @@ async def maybe_open_nursery(
shield: bool = False, shield: bool = False,
lib: ModuleType = trio, lib: ModuleType = trio,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
''' '''
Create a new nursery if None provided. Create a new nursery if None provided.
@ -69,7 +67,7 @@ async def maybe_open_nursery(
if nursery is not None: if nursery is not None:
yield nursery yield nursery
else: else:
async with lib.open_nursery(**kwargs) as nursery: async with lib.open_nursery() as nursery:
nursery.cancel_scope.shield = shield nursery.cancel_scope.shield = shield
yield nursery yield nursery
@ -145,14 +143,9 @@ async def gather_contexts(
'Use a non-lazy iterator or sequence type intead!' 'Use a non-lazy iterator or sequence type intead!'
) )
async with trio.open_nursery( async with trio.open_nursery() as n:
strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn:
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( n.start_soon(
_enter_and_wait, _enter_and_wait,
mngr, mngr,
unwrapped, unwrapped,