diff --git a/tractor/_debug.py b/tractor/_debug.py index af4e748..53d1f44 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -124,10 +124,11 @@ class PdbwTeardown(pdbpp.Pdb): @asynccontextmanager async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: - """Acquire a actor local FIFO lock meant to mutex entry to a local - debugger entry point to avoid tty clobbering by multiple processes. - """ - global _debug_lock, _global_actor_in_debug + '''Acquire a actor local FIFO lock meant to mutex entry to a local + debugger entry point to avoid tty clobbering a global root process. + + ''' + global _debug_lock, _global_actor_in_debug, _no_remote_has_tty task_name = trio.lowlevel.current_task().name @@ -135,15 +136,60 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" ) - async with _debug_lock: + we_acquired = False + + if _no_remote_has_tty is None: + # mark the tty lock as being in use so that the runtime + # can try to avoid clobbering any connection from a child + # that's currently relying on it. + _no_remote_has_tty = trio.Event() + + try: + log.debug( + f"entering lock checkpoint, remote task: {task_name}:{uid}" + ) + we_acquired = True + await _debug_lock.acquire() + + # we_acquired = True _global_actor_in_debug = uid log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") - yield + # NOTE: critical section! + # this yield is unshielded. + # IF we received a cancel during the shielded lock + # entry of some next-in-queue requesting task, + # then the resumption here will result in that + # Cancelled being raised to our caller below! - _global_actor_in_debug = None - log.debug(f"TTY lock released, remote task: {task_name}:{uid}") + # in this case the finally below should trigger + # and the surrounding calle side context should cancel + # normally relaying back to the caller. + + yield _debug_lock + + finally: + # if _global_actor_in_debug == uid: + if we_acquired and _debug_lock.locked(): + _debug_lock.release() + + # IFF there are no more requesting tasks queued up fire, the + # "tty-unlocked" event thereby alerting any monitors of the lock that + # we are now back in the "tty unlocked" state. This is basically + # and edge triggered signal around an empty queue of sub-actor + # tasks that may have tried to acquire the lock. + stats = _debug_lock.statistics() + if ( + not stats.owner + ): + log.pdb(f"No more tasks waiting on tty lock! says {uid}") + _no_remote_has_tty.set() + _no_remote_has_tty = None + + _global_actor_in_debug = None + + log.debug(f"TTY lock released, remote task: {task_name}:{uid}") # @contextmanager @@ -169,53 +215,43 @@ async def _hijack_stdin_relay_to_child( bossing. ''' - global _no_remote_has_tty - - # mark the tty lock as being in use so that the runtime - # can try to avoid clobbering any connection from a child - # that's currently relying on it. - _no_remote_has_tty = trio.Event() - task_name = trio.lowlevel.current_task().name # TODO: when we get to true remote debugging # this will deliver stdin data? log.debug( - "Attempting to acquire TTY lock, " + "Attempting to acquire TTY lock\n" f"remote task: {task_name}:{subactor_uid}" ) log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") - async with _acquire_debug_lock(subactor_uid): + with trio.CancelScope(shield=True): - # XXX: only shield the context sync step! - with trio.CancelScope(shield=True): + async with _acquire_debug_lock(subactor_uid): # indicate to child that we've locked stdio await ctx.started('Locked') log.pdb( # type: ignore f"Actor {subactor_uid} ACQUIRED stdin hijack lock") - # wait for unlock pdb by child - async with ctx.open_stream() as stream: - try: - assert await stream.receive() == 'pdb_unlock' + # wait for unlock pdb by child + async with ctx.open_stream() as stream: + try: + assert await stream.receive() == 'pdb_unlock' - except trio.BrokenResourceError: - # XXX: there may be a race with the portal teardown - # with the calling actor which we can safely ignore - # the alternative would be sending an ack message - # and allowing the client to wait for us to teardown - # first? - pass + except trio.BrokenResourceError: + # XXX: there may be a race with the portal teardown + # with the calling actor which we can safely ignore + # the alternative would be sending an ack message + # and allowing the client to wait for us to teardown + # first? + pass log.debug( f"TTY lock released, remote task: {task_name}:{subactor_uid}") - log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") - _no_remote_has_tty.set() return "pdb_unlock_complete" @@ -243,17 +279,21 @@ async def _breakpoint( global _local_pdb_complete, _pdb_release_hook global _local_task_in_debug, _global_actor_in_debug + await trio.lowlevel.checkpoint() + async def wait_for_parent_stdin_hijack( task_status=trio.TASK_STATUS_IGNORED ): global _debugger_request_cs - with trio.CancelScope() as cs: + with trio.CancelScope(shield=True) as cs: _debugger_request_cs = cs try: async with get_root() as portal: + log.error('got portal') + # this syncs to child's ``Context.started()`` call. async with portal.open_context( @@ -262,18 +302,22 @@ async def _breakpoint( ) as (ctx, val): + log.error('locked context') assert val == 'Locked' async with ctx.open_stream() as stream: + log.error('opened stream') # unblock local caller task_status.started() - # TODO: shielding currently can cause hangs... - # with trio.CancelScope(shield=True): + try: + await _local_pdb_complete.wait() - await _local_pdb_complete.wait() - await stream.send('pdb_unlock') + finally: + # TODO: shielding currently can cause hangs... + with trio.CancelScope(shield=True): + await stream.send('pdb_unlock') # sync with callee termination assert await ctx.result() == "pdb_unlock_complete" @@ -292,6 +336,7 @@ async def _breakpoint( # TODO: need a more robust check for the "root" actor if actor._parent_chan and not is_root_process(): + if _local_task_in_debug: if _local_task_in_debug == task_name: # this task already has the lock and is @@ -316,7 +361,13 @@ async def _breakpoint( # this **must** be awaited by the caller and is done using the # root nursery so that the debugger can continue to run without # being restricted by the scope of a new task nursery. - await actor._service_n.start(wait_for_parent_stdin_hijack) + + # NOTE: if we want to debug a trio.Cancelled triggered exception + # we have to figure out how to avoid having the service nursery + # cancel on this task start? I *think* this works below? + # actor._service_n.cancel_scope.shield = shield + with trio.CancelScope(shield=True): + await actor._service_n.start(wait_for_parent_stdin_hijack) elif is_root_process(): @@ -333,6 +384,11 @@ async def _breakpoint( # XXX: since we need to enter pdb synchronously below, # we have to release the lock manually from pdb completion # callbacks. Can't think of a nicer way then this atm. + if _debug_lock.locked(): + log.warning( + 'Root actor attempting to acquire active tty lock' + f' owned by {_global_actor_in_debug}') + await _debug_lock.acquire() _global_actor_in_debug = actor.uid diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 9630d2a..cc818d9 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -170,16 +170,25 @@ class ActorNursery: log.warning(f"Cancelling nursery in {self._actor.uid}") with trio.move_on_after(3) as cs: + async with trio.open_nursery() as nursery: + for subactor, proc, portal in self._children.values(): + + # TODO: are we ever even going to use this or + # is the spawning backend responsible for such + # things? I'm thinking latter. if hard_kill: proc.terminate() + else: if portal is None: # actor hasn't fully spawned yet event = self._actor._peer_connected[subactor.uid] log.warning( f"{subactor.uid} wasn't finished spawning?") + await event.wait() + # channel/portal should now be up _, _, portal = self._children[subactor.uid] @@ -239,6 +248,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # As such if the strategy propagates any error(s) upwards # the above "daemon actor" nursery will be notified. async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( actor, ria_nursery, @@ -249,35 +259,53 @@ async def _open_and_supervise_one_cancels_all_nursery( # spawning of actors happens in the caller's scope # after we yield upwards yield anursery - log.debug( + + log.runtime( f"Waiting on subactors {anursery._children} " "to complete" ) + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + + # signal all process monitor tasks to conduct + # hard join phase. + anursery._join_procs.set() + except BaseException as err: - if is_root_process() and ( - type(err) in { - Exception, trio.MultiError, trio.Cancelled - } - ): - # if we error in the root but the debugger is - # engaged we don't want to prematurely kill (and - # thus clobber access to) the local tty streams. - # instead try to wait for pdb to be released before - # tearing down. - debug_complete = _debug._pdb_complete - if debug_complete and not debug_complete.is_set(): - log.warning( - "Root has errored but pdb is active..waiting " - "on debug lock") - await _debug._pdb_complete.wait() + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. + if is_root_process(): + log.exception(f"we're root with {err}") - # raise + # wait to see if a sub-actor task + # will be scheduled and grab the tty + # lock on the next tick + # await trio.testing.wait_all_tasks_blocked() + + debug_complete = _debug._no_remote_has_tty + if ( + debug_complete and + not debug_complete.is_set() + ): + log.warning( + 'Root has errored but pdb is in use by ' + f'child {_debug._global_actor_in_debug}\n' + 'Waiting on tty lock to release..') + + with trio.CancelScope(shield=True): + await debug_complete.wait() # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't # worry more are coming). anursery._join_procs.set() + try: # XXX: hypothetically an error could be # raised and then a cancel signal shows up @@ -313,15 +341,18 @@ async def _open_and_supervise_one_cancels_all_nursery( else: raise - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() - # ria_nursery scope end # XXX: do we need a `trio.Cancelled` catch here as well? - except (Exception, trio.MultiError, trio.Cancelled) as err: + # this is the catch around the ``.run_in_actor()`` nursery + except ( + + Exception, + trio.MultiError, + trio.Cancelled + + ) as err: + # If actor-local error was raised while waiting on # ".run_in_actor()" actors then we also want to cancel all # remaining sub-actors (due to our lone strategy: