Compare commits
No commits in common. "71562e0af7e9d13f04f8069640307b42557cdfe3" and "cd1761e44db7fca60ea34020e12de9a038024a22" have entirely different histories.
71562e0af7
...
cd1761e44d
|
@ -117,9 +117,7 @@ async def open_actor_local_nursery(
|
|||
ctx: tractor.Context,
|
||||
):
|
||||
global _nursery
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
async with trio.open_nursery() as n:
|
||||
_nursery = n
|
||||
await ctx.started()
|
||||
await trio.sleep(10)
|
||||
|
|
|
@ -533,10 +533,6 @@ async def open_portal(
|
|||
async with maybe_open_nursery(
|
||||
tn,
|
||||
shield=shield,
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? soo roll our own then ??
|
||||
# -> since we kinda want the "if only one `.exception` then
|
||||
# just raise that" interface?
|
||||
) as tn:
|
||||
|
||||
if not channel.connected():
|
||||
|
|
|
@ -317,6 +317,8 @@ class Lock:
|
|||
we_released: bool = False
|
||||
ctx_in_debug: Context|None = cls.ctx_in_debug
|
||||
repl_task: Task|Thread|None = DebugStatus.repl_task
|
||||
message: str = ''
|
||||
|
||||
try:
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
thread: threading.Thread = threading.current_thread()
|
||||
|
@ -331,10 +333,6 @@ class Lock:
|
|||
return False
|
||||
|
||||
task: Task = current_task()
|
||||
message: str = (
|
||||
'TTY NOT RELEASED on behalf of caller\n'
|
||||
f'|_{task}\n'
|
||||
)
|
||||
|
||||
# sanity check that if we're the root actor
|
||||
# the lock is marked as such.
|
||||
|
@ -349,6 +347,11 @@ class Lock:
|
|||
else:
|
||||
assert DebugStatus.repl_task is not task
|
||||
|
||||
message: str = (
|
||||
'TTY lock was NOT released on behalf of caller\n'
|
||||
f'|_{task}\n'
|
||||
)
|
||||
|
||||
lock: trio.StrictFIFOLock = cls._debug_lock
|
||||
owner: Task = lock.statistics().owner
|
||||
if (
|
||||
|
@ -363,21 +366,23 @@ class Lock:
|
|||
# correct task, greenback-spawned-task and/or thread
|
||||
# being set to the `.repl_task` such that the above
|
||||
# condition matches and we actually release the lock.
|
||||
#
|
||||
# This is particular of note from `.pause_from_sync()`!
|
||||
|
||||
):
|
||||
cls._debug_lock.release()
|
||||
we_released: bool = True
|
||||
if repl_task:
|
||||
message: str = (
|
||||
'TTY released on behalf of root-actor-local REPL owner\n'
|
||||
'Lock released on behalf of root-actor-local REPL owner\n'
|
||||
f'|_{repl_task}\n'
|
||||
)
|
||||
else:
|
||||
message: str = (
|
||||
'TTY released by us on behalf of remote peer?\n'
|
||||
f'{ctx_in_debug}\n'
|
||||
'TTY lock released by us on behalf of remote peer?\n'
|
||||
f'|_ctx_in_debug: {ctx_in_debug}\n\n'
|
||||
)
|
||||
# mk_pdb().set_trace()
|
||||
# elif owner:
|
||||
|
||||
except RuntimeError as rte:
|
||||
log.exception(
|
||||
|
@ -395,8 +400,7 @@ class Lock:
|
|||
req_handler_finished: trio.Event|None = Lock.req_handler_finished
|
||||
if (
|
||||
not lock_stats.owner
|
||||
and
|
||||
req_handler_finished is None
|
||||
and req_handler_finished is None
|
||||
):
|
||||
message += (
|
||||
'-> No new task holds the TTY lock!\n\n'
|
||||
|
@ -414,8 +418,8 @@ class Lock:
|
|||
repl_task
|
||||
)
|
||||
message += (
|
||||
f'A non-caller task still owns this lock on behalf of\n'
|
||||
f'{behalf_of_task}\n'
|
||||
f'A non-caller task still owns this lock on behalf of '
|
||||
f'`{behalf_of_task}`\n'
|
||||
f'lock owner task: {lock_stats.owner}\n'
|
||||
)
|
||||
|
||||
|
@ -443,6 +447,8 @@ class Lock:
|
|||
|
||||
if message:
|
||||
log.devx(message)
|
||||
else:
|
||||
import pdbp; pdbp.set_trace()
|
||||
|
||||
return we_released
|
||||
|
||||
|
@ -662,11 +668,10 @@ async def lock_stdio_for_peer(
|
|||
fail_reason: str = (
|
||||
f'on behalf of peer\n\n'
|
||||
f'x)<=\n'
|
||||
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n'
|
||||
f'\n'
|
||||
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
||||
|
||||
'Forcing `Lock.release()` due to acquire failure!\n\n'
|
||||
f'x)=>\n'
|
||||
f' {ctx}'
|
||||
f'x)=> {ctx}\n'
|
||||
)
|
||||
if isinstance(req_err, trio.Cancelled):
|
||||
fail_reason = (
|
||||
|
@ -1174,7 +1179,7 @@ async def request_root_stdio_lock(
|
|||
log.devx(
|
||||
'Initing stdio-lock request task with root actor'
|
||||
)
|
||||
# TODO: can we implement this mutex more generally as
|
||||
# TODO: likely we can implement this mutex more generally as
|
||||
# a `._sync.Lock`?
|
||||
# -[ ] simply add the wrapping needed for the debugger specifics?
|
||||
# - the `__pld_spec__` impl and maybe better APIs for the client
|
||||
|
@ -1185,7 +1190,6 @@ async def request_root_stdio_lock(
|
|||
# - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock
|
||||
DebugStatus.req_finished = trio.Event()
|
||||
DebugStatus.req_task = current_task()
|
||||
req_err: BaseException|None = None
|
||||
try:
|
||||
from tractor._discovery import get_root
|
||||
# NOTE: we need this to ensure that this task exits
|
||||
|
@ -1208,7 +1212,6 @@ async def request_root_stdio_lock(
|
|||
# )
|
||||
DebugStatus.req_cs = req_cs
|
||||
req_ctx: Context|None = None
|
||||
ctx_eg: BaseExceptionGroup|None = None
|
||||
try:
|
||||
# TODO: merge into single async with ?
|
||||
async with get_root() as portal:
|
||||
|
@ -1239,12 +1242,7 @@ async def request_root_stdio_lock(
|
|||
)
|
||||
|
||||
# try:
|
||||
if (locker := status.subactor_uid) != actor_uid:
|
||||
raise DebugStateError(
|
||||
f'Root actor locked by another peer !?\n'
|
||||
f'locker: {locker!r}\n'
|
||||
f'actor_uid: {actor_uid}\n'
|
||||
)
|
||||
assert status.subactor_uid == actor_uid
|
||||
assert status.cid
|
||||
# except AttributeError:
|
||||
# log.exception('failed pldspec asserts!')
|
||||
|
@ -1281,11 +1279,10 @@ async def request_root_stdio_lock(
|
|||
f'Exitting {req_ctx.side!r}-side of locking req_ctx\n'
|
||||
)
|
||||
|
||||
except* (
|
||||
except (
|
||||
tractor.ContextCancelled,
|
||||
trio.Cancelled,
|
||||
) as _taskc_eg:
|
||||
ctx_eg = _taskc_eg
|
||||
):
|
||||
log.cancel(
|
||||
'Debug lock request was CANCELLED?\n\n'
|
||||
f'<=c) {req_ctx}\n'
|
||||
|
@ -1294,23 +1291,21 @@ async def request_root_stdio_lock(
|
|||
)
|
||||
raise
|
||||
|
||||
except* (
|
||||
except (
|
||||
BaseException,
|
||||
) as _ctx_eg:
|
||||
ctx_eg = _ctx_eg
|
||||
) as ctx_err:
|
||||
message: str = (
|
||||
'Failed during debug request dialog with root actor?\n'
|
||||
'Failed during debug request dialog with root actor?\n\n'
|
||||
)
|
||||
if (req_ctx := DebugStatus.req_ctx):
|
||||
message += (
|
||||
f'<=x)\n'
|
||||
f' |_{req_ctx}\n'
|
||||
f'<=x) {req_ctx}\n\n'
|
||||
f'Cancelling IPC ctx!\n'
|
||||
)
|
||||
try:
|
||||
await req_ctx.cancel()
|
||||
except trio.ClosedResourceError as terr:
|
||||
ctx_eg.add_note(
|
||||
ctx_err.add_note(
|
||||
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
|
||||
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
|
||||
)
|
||||
|
@ -1319,45 +1314,21 @@ async def request_root_stdio_lock(
|
|||
message += 'Failed in `Portal.open_context()` call ??\n'
|
||||
|
||||
log.exception(message)
|
||||
ctx_eg.add_note(message)
|
||||
raise ctx_eg
|
||||
ctx_err.add_note(message)
|
||||
raise ctx_err
|
||||
|
||||
except BaseException as _req_err:
|
||||
req_err = _req_err
|
||||
|
||||
# XXX NOTE, since new `trio` enforces strict egs by default
|
||||
# we have to always handle the eg explicitly given the
|
||||
# `Portal.open_context()` call above (which implicitly opens
|
||||
# a nursery).
|
||||
match req_err:
|
||||
case BaseExceptionGroup():
|
||||
# for an eg of just one taskc, just unpack and raise
|
||||
# since we want to propagate a plane ol' `Cancelled`
|
||||
# up from the `.pause()` call.
|
||||
excs: list[BaseException] = req_err.exceptions
|
||||
if (
|
||||
len(excs) == 1
|
||||
and
|
||||
type(exc := excs[0]) in (
|
||||
tractor.ContextCancelled,
|
||||
trio.Cancelled,
|
||||
)
|
||||
):
|
||||
log.cancel(
|
||||
'Debug lock request CANCELLED?\n'
|
||||
f'{req_ctx}\n'
|
||||
)
|
||||
raise exc
|
||||
case (
|
||||
tractor.ContextCancelled(),
|
||||
trio.Cancelled(),
|
||||
):
|
||||
log.cancel(
|
||||
'Debug lock request CANCELLED?\n'
|
||||
f'{req_ctx}\n'
|
||||
)
|
||||
raise exc
|
||||
except (
|
||||
tractor.ContextCancelled,
|
||||
trio.Cancelled,
|
||||
):
|
||||
log.cancel(
|
||||
'Debug lock request CANCELLED?\n'
|
||||
f'{req_ctx}\n'
|
||||
)
|
||||
raise
|
||||
|
||||
except BaseException as req_err:
|
||||
# log.error('Failed to request root stdio-lock?')
|
||||
DebugStatus.req_err = req_err
|
||||
DebugStatus.release()
|
||||
|
||||
|
@ -1372,7 +1343,7 @@ async def request_root_stdio_lock(
|
|||
'Failed during stdio-locking dialog from root actor\n\n'
|
||||
|
||||
f'<=x)\n'
|
||||
f' |_{DebugStatus.req_ctx}\n'
|
||||
f'|_{DebugStatus.req_ctx}\n'
|
||||
) from req_err
|
||||
|
||||
finally:
|
||||
|
@ -1435,7 +1406,7 @@ def any_connected_locker_child() -> bool:
|
|||
actor: Actor = current_actor()
|
||||
|
||||
if not is_root_process():
|
||||
raise InternalError('This is a root-actor only API!')
|
||||
raise RuntimeError('This is a root-actor only API!')
|
||||
|
||||
if (
|
||||
(ctx := Lock.ctx_in_debug)
|
||||
|
@ -2172,12 +2143,11 @@ async def _pause(
|
|||
# `_enter_repl_sync()` into a common @cm?
|
||||
except BaseException as _pause_err:
|
||||
pause_err: BaseException = _pause_err
|
||||
_repl_fail_report: str|None = _repl_fail_msg
|
||||
if isinstance(pause_err, bdb.BdbQuit):
|
||||
log.devx(
|
||||
'REPL for pdb was explicitly quit!\n'
|
||||
)
|
||||
_repl_fail_report = None
|
||||
_repl_fail_msg = None
|
||||
|
||||
# when the actor is mid-runtime cancellation the
|
||||
# `Actor._service_n` might get closed before we can spawn
|
||||
|
@ -2197,16 +2167,16 @@ async def _pause(
|
|||
return
|
||||
|
||||
elif isinstance(pause_err, trio.Cancelled):
|
||||
_repl_fail_report += (
|
||||
_repl_fail_msg = (
|
||||
'You called `tractor.pause()` from an already cancelled scope!\n\n'
|
||||
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
|
||||
)
|
||||
|
||||
else:
|
||||
_repl_fail_report += f'on behalf of {repl_task} ??\n'
|
||||
_repl_fail_msg += f'on behalf of {repl_task} ??\n'
|
||||
|
||||
if _repl_fail_report:
|
||||
log.exception(_repl_fail_report)
|
||||
if _repl_fail_msg:
|
||||
log.exception(_repl_fail_msg)
|
||||
|
||||
if not actor.is_infected_aio():
|
||||
DebugStatus.release(cancel_req_task=True)
|
||||
|
@ -3081,8 +3051,7 @@ async def maybe_wait_for_debugger(
|
|||
|
||||
if (
|
||||
not debug_mode()
|
||||
and
|
||||
not child_in_debug
|
||||
and not child_in_debug
|
||||
):
|
||||
return False
|
||||
|
||||
|
@ -3140,7 +3109,7 @@ async def maybe_wait_for_debugger(
|
|||
logmeth(
|
||||
msg
|
||||
+
|
||||
'\n^^ Root is waiting on tty lock release.. ^^\n'
|
||||
'\nRoot is waiting on tty lock to release from\n\n'
|
||||
# f'{caller_frame_info}\n'
|
||||
)
|
||||
|
||||
|
@ -3203,11 +3172,11 @@ async def maybe_wait_for_debugger(
|
|||
@cm
|
||||
def open_crash_handler(
|
||||
catch: set[BaseException] = {
|
||||
# Exception,
|
||||
BaseException,
|
||||
},
|
||||
ignore: set[BaseException] = {
|
||||
KeyboardInterrupt,
|
||||
trio.Cancelled,
|
||||
},
|
||||
tb_hide: bool = True,
|
||||
):
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
`tokio` style broadcast channel.
|
||||
``tokio`` style broadcast channel.
|
||||
https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
|
||||
|
||||
'''
|
||||
|
|
|
@ -57,8 +57,6 @@ async def maybe_open_nursery(
|
|||
shield: bool = False,
|
||||
lib: ModuleType = trio,
|
||||
|
||||
**kwargs, # proxy thru
|
||||
|
||||
) -> AsyncGenerator[trio.Nursery, Any]:
|
||||
'''
|
||||
Create a new nursery if None provided.
|
||||
|
@ -69,7 +67,7 @@ async def maybe_open_nursery(
|
|||
if nursery is not None:
|
||||
yield nursery
|
||||
else:
|
||||
async with lib.open_nursery(**kwargs) as nursery:
|
||||
async with lib.open_nursery() as nursery:
|
||||
nursery.cancel_scope.shield = shield
|
||||
yield nursery
|
||||
|
||||
|
@ -145,14 +143,9 @@ async def gather_contexts(
|
|||
'Use a non-lazy iterator or sequence type intead!'
|
||||
)
|
||||
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? soo roll our own then ??
|
||||
# -> since we kinda want the "if only one `.exception` then
|
||||
# just raise that" interface?
|
||||
) as tn:
|
||||
async with trio.open_nursery() as n:
|
||||
for mngr in mngrs:
|
||||
tn.start_soon(
|
||||
n.start_soon(
|
||||
_enter_and_wait,
|
||||
mngr,
|
||||
unwrapped,
|
||||
|
|
Loading…
Reference in New Issue