Compare commits

..

No commits in common. "5d74490f1e4650783549637b3c10f377454066a6" and "38c22dd82b456e17724f96ce4d85d48c205102d6" have entirely different histories.

9 changed files with 131 additions and 369 deletions

View File

@ -1,53 +0,0 @@
'''
fast fail test with a context.
ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent
nrusery.
'''
import trio
import tractor
@tractor.context
async def sleep(
ctx: tractor.Context,
):
await trio.sleep(0.5)
await ctx.started()
await trio.sleep_forever()
async def open_ctx(
n: tractor._trionics.ActorNursery
):
# spawn both actors
portal = await n.start_actor(
name='sleeper',
enable_modules=[__name__],
)
async with portal.open_context(
sleep,
) as (ctx, first):
assert first is None
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='runtime',
) as an:
async with trio.open_nursery() as n:
n.start_soon(open_ctx, an)
await trio.sleep(0.2)
await trio.sleep(0.1)
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -317,58 +317,32 @@ def test_multi_daemon_subactors(spawn, loglevel):
next_msg = name_error_msg next_msg = name_error_msg
elif name_error_msg in before: elif name_error_msg in before:
next_msg = bp_forever_msg next_msg = None
else: else:
raise ValueError("Neither log msg was found !?") raise ValueError("Neither log msg was found !?")
# NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger
# tearing down since both child actors would be cancelled and it was
# unlikely that `bp_forever` would re-acquire the tty loack again.
# Now, we should have a final resumption in the root plus a possible
# second entry by `bp_forever`.
child.sendline('c') child.sendline('c')
# first name_error failure
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
if next_msg:
assert next_msg in before assert next_msg in before
# XXX: hoorayy the root clobering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
# now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered
# ``_debug._no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B)
# at some point here there should have been some warning msg from
# the root announcing it avoided a clobber of the child's lock, but
# it seems unreliable in testing here to gnab it:
# assert "in use by child ('bp_forever'," in before
# wait for final error in root
while True:
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
try: assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
# root error should be packed as remote error
assert "_exceptions.RemoteActorError: ('name_error'" in before
break
except AssertionError:
assert bp_forever_msg in before
try: try:
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
except pexpect.exceptions.TIMEOUT: except pexpect.exceptions.TIMEOUT:
# Failed to exit using continue..? # Failed to exit using continue..?
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
@ -423,7 +397,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
# startup time can be iffy # startup time can be iffy
# time.sleep(1) time.sleep(1)
for i in range(12): for i in range(12):
try: try:
@ -505,21 +479,3 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
def test_root_cancels_child_context_during_startup(
spawn,
):
'''Verify a fast fail in the root doesn't lock up the child reaping
and all while using the new context api.
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "AssertionError" in before
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -463,8 +463,7 @@ class Actor:
log.runtime( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -632,7 +631,7 @@ class Actor:
break break
log.transport( # type: ignore log.trace( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')

View File

@ -45,8 +45,7 @@ _global_actor_in_debug: Optional[Tuple[str, str]] = None
# lock in root actor preventing multi-access to local tty # lock in root actor preventing multi-access to local tty
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_local_pdb_complete: Optional[trio.Event] = None _pdb_complete: Optional[trio.Event] = None
_no_remote_has_tty: Optional[trio.Event] = None
# XXX: set by the current task waiting on the root tty lock # XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message # and must be cancelled if this actor is cancelled via message
@ -110,7 +109,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin): # async with aclosing(async_stdin):
# async for msg in async_stdin: # async for msg in async_stdin:
# log.runtime(f"Stdin input:\n{msg}") # log.trace(f"Stdin input:\n{msg}")
# # encode to bytes # # encode to bytes
# bmsg = str.encode(msg) # bmsg = str.encode(msg)
@ -124,71 +123,24 @@ class PdbwTeardown(pdbpp.Pdb):
@asynccontextmanager @asynccontextmanager
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
'''Acquire a actor local FIFO lock meant to mutex entry to a local """Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering a global root process. debugger entry point to avoid tty clobbering by multiple processes.
"""
''' global _debug_lock, _global_actor_in_debug
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
log.pdb(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try:
log.debug( log.debug(
f"entering lock checkpoint, remote task: {task_name}:{uid}" f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
)
we_acquired = True
await _debug_lock.acquire()
# we_acquired = True async with _debug_lock:
# _debug_lock._uid = uid
_global_actor_in_debug = uid _global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield
# NOTE: critical section!
# this yield is unshielded.
# IF we received a cancel during the shielded lock
# entry of some next-in-queue requesting task,
# then the resumption here will result in that
# Cancelled being raised to our caller below!
# in this case the finally below should trigger
# and the surrounding calle side context should cancel
# normally relaying back to the caller.
yield _debug_lock
finally:
# if _global_actor_in_debug == uid:
if we_acquired and _debug_lock.locked():
_debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that
# we are now back in the "tty unlocked" state. This is basically
# and edge triggered signal around an empty queue of sub-actor
# tasks that may have tried to acquire the lock.
stats = _debug_lock.statistics()
if (
not stats.owner
):
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set()
_no_remote_has_tty = None
_global_actor_in_debug = None _global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}") log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
@ -210,30 +162,29 @@ async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> str: ) -> str:
'''Hijack the tty in the root process of an actor tree such that
the pdbpp debugger console can be allocated to a sub-actor for repl
bossing.
''' global _pdb_complete
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
# TODO: when we get to true remote debugging # TODO: when we get to true remote debugging
# this will deliver stdin data? # this will deliver stdin data?
log.debug( log.debug(
"Attempting to acquire TTY lock\n" "Attempting to acquire TTY lock, "
f"remote task: {task_name}:{subactor_uid}" f"remote task: {task_name}:{subactor_uid}"
) )
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
with trio.CancelScope(shield=True):
async with _acquire_debug_lock(subactor_uid): async with _acquire_debug_lock(subactor_uid):
# XXX: only shield the context sync step!
with trio.CancelScope(shield=True):
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
await ctx.started('Locked') await ctx.started('Locked')
log.pdb( # type: ignore log.runtime( # type: ignore
f"Actor {subactor_uid} ACQUIRED stdin hijack lock") f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for unlock pdb by child # wait for unlock pdb by child
@ -252,6 +203,7 @@ async def _hijack_stdin_relay_to_child(
log.debug( log.debug(
f"TTY lock released, remote task: {task_name}:{subactor_uid}") f"TTY lock released, remote task: {task_name}:{subactor_uid}")
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
return "pdb_unlock_complete" return "pdb_unlock_complete"
@ -263,24 +215,20 @@ async def _breakpoint(debug_func) -> None:
actor = tractor.current_actor() actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
global _local_pdb_complete, _pdb_release_hook global _pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint()
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
global _debugger_request_cs global _debugger_request_cs
with trio.CancelScope(shield=True) as cs: with trio.CancelScope() as cs:
_debugger_request_cs = cs _debugger_request_cs = cs
try: try:
async with get_root() as portal: async with get_root() as portal:
log.error('got portal')
# this syncs to child's ``Context.started()`` call. # this syncs to child's ``Context.started()`` call.
async with portal.open_context( async with portal.open_context(
@ -289,21 +237,17 @@ async def _breakpoint(debug_func) -> None:
) as (ctx, val): ) as (ctx, val):
log.error('locked context')
assert val == 'Locked' assert val == 'Locked'
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
log.error('opened stream')
# unblock local caller # unblock local caller
task_status.started() task_status.started()
try:
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs... # TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await _pdb_complete.wait()
await stream.send('pdb_unlock') await stream.send('pdb_unlock')
# sync with callee termination # sync with callee termination
@ -318,12 +262,11 @@ async def _breakpoint(debug_func) -> None:
_local_task_in_debug = None _local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock") log.debug(f"Child {actor} released parent stdio lock")
if not _local_pdb_complete or _local_pdb_complete.is_set(): if not _pdb_complete or _pdb_complete.is_set():
_local_pdb_complete = trio.Event() _pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process(): if actor._parent_chan and not is_root_process():
if _local_task_in_debug: if _local_task_in_debug:
if _local_task_in_debug == task_name: if _local_task_in_debug == task_name:
# this task already has the lock and is # this task already has the lock and is
@ -335,7 +278,7 @@ async def _breakpoint(debug_func) -> None:
# support for recursive entries to `tractor.breakpoint()` # support for recursive entries to `tractor.breakpoint()`
log.warning(f"{actor.uid} already has a debug lock, waiting...") log.warning(f"{actor.uid} already has a debug lock, waiting...")
await _local_pdb_complete.wait() await _pdb_complete.wait()
await trio.sleep(0.1) await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent # mark local actor as "in debug mode" to avoid recurrent
@ -343,17 +286,11 @@ async def _breakpoint(debug_func) -> None:
_local_task_in_debug = task_name _local_task_in_debug = task_name
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
_pdb_release_hook = _local_pdb_complete.set _pdb_release_hook = _pdb_complete.set
# this **must** be awaited by the caller and is done using the # this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without # root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery. # being restricted by the scope of a new task nursery.
# NOTE: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
with trio.CancelScope(shield=True):
await actor._service_n.start(wait_for_parent_stdin_hijack) await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process(): elif is_root_process():
@ -371,11 +308,6 @@ async def _breakpoint(debug_func) -> None:
# XXX: since we need to enter pdb synchronously below, # XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion # we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm. # callbacks. Can't think of a nicer way then this atm.
if _debug_lock.locked():
log.warning(
'Root actor attempting to acquire active tty lock'
f' owned by {_global_actor_in_debug}')
await _debug_lock.acquire() await _debug_lock.acquire()
_global_actor_in_debug = actor.uid _global_actor_in_debug = actor.uid
@ -383,13 +315,13 @@ async def _breakpoint(debug_func) -> None:
# the lock must be released on pdb completion # the lock must be released on pdb completion
def teardown(): def teardown():
global _local_pdb_complete, _debug_lock global _pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release() _debug_lock.release()
_global_actor_in_debug = None _global_actor_in_debug = None
_local_task_in_debug = None _local_task_in_debug = None
_local_pdb_complete.set() _pdb_complete.set()
_pdb_release_hook = teardown _pdb_release_hook = teardown
@ -417,7 +349,7 @@ def _set_trace(actor=None):
pdb = _mk_pdb() pdb = _mk_pdb()
if actor is not None: if actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
pdb.set_trace( pdb.set_trace(
# start 2 levels up in user code # start 2 levels up in user code
@ -447,7 +379,7 @@ breakpoint = partial(
def _post_mortem(actor): def _post_mortem(actor):
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb = _mk_pdb() pdb = _mk_pdb()
# custom Pdb post-mortem entry # custom Pdb post-mortem entry

View File

@ -1,6 +1,5 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
import platform import platform
import typing import typing
@ -62,6 +61,7 @@ class MsgpackTCPStream:
use_list=False, use_list=False,
) )
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
@ -88,7 +88,7 @@ class MsgpackTCPStream:
else: else:
raise raise
log.transport(f"received {data}") # type: ignore log.trace(f"received {data}") # type: ignore
if data == b'': if data == b'':
raise TransportClosed( raise TransportClosed(
@ -169,7 +169,6 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
@ -181,21 +180,13 @@ class Channel:
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream) self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.transport(f"send `{item}`") # type: ignore log.trace(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
@ -214,8 +205,7 @@ class Channel:
raise raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(
log.transport(
f'Closing channel to {self.uid} ' f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}' f'{self.laddr} -> {self.raddr}'
) )
@ -244,11 +234,11 @@ class Channel:
await self.connect() await self.connect()
cancelled = cancel_scope.cancelled_caught cancelled = cancel_scope.cancelled_caught
if cancelled: if cancelled:
log.transport( log.warning(
"Reconnect timed out after 3 seconds, retrying...") "Reconnect timed out after 3 seconds, retrying...")
continue continue
else: else:
log.transport("Stream connection re-established!") log.warning("Stream connection re-established!")
# run any reconnection sequence # run any reconnection sequence
on_recon = self._recon_seq on_recon = self._recon_seq
if on_recon: if on_recon:
@ -257,7 +247,7 @@ class Channel:
except (OSError, ConnectionRefusedError): except (OSError, ConnectionRefusedError):
if not down: if not down:
down = True down = True
log.transport( log.warning(
f"Connection to {self.raddr} went down, waiting" f"Connection to {self.raddr} went down, waiting"
" for re-establishment") " for re-establishment")
await trio.sleep(1) await trio.sleep(1)

View File

@ -171,11 +171,8 @@ async def open_root_actor(
yield actor yield actor
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# with trio.CancelScope(shield=True): logger.exception("Actor crashed:")
entered = await _debug._maybe_enter_pm(err) await _debug._maybe_enter_pm(err)
if not entered:
logger.exception("Root actor crashed:")
# always re-raise # always re-raise
raise raise

View File

@ -28,7 +28,6 @@ from ._state import (
is_root_process, is_root_process,
_runtime_vars, _runtime_vars,
) )
from ._debug import _global_actor_in_debug
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
@ -155,27 +154,6 @@ async def cancel_on_completion(
# cancel the process now that we have a final result # cancel the process now that we have a final result
await portal.cancel_actor() await portal.cancel_actor()
async def do_hard_kill(
proc: trio.Process,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"HARD KILLING {proc}")
proc.kill()
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
@ -231,46 +209,46 @@ async def spawn_subactor(
yield proc yield proc
finally: finally:
log.runtime(f"Attempting to kill {proc}") log.debug(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown # XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early # to avoid killing the process too early
# since trio does this internally on ``__aexit__()`` # since trio does this internally on ``__aexit__()``
# if ( if (
# is_root_process() is_root_process()
# # XXX: basically the pre-closing of stdstreams in a # XXX: basically the pre-closing of stdstreams in a
# # root-processe's ``trio.Process.aclose()`` can clobber # root-processe's ``trio.Process.aclose()`` can clobber
# # any existing debugger session so we avoid # any existing debugger session so we avoid
# and _runtime_vars['_debug_mode'] and _runtime_vars['_debug_mode']
# and _global_actor_in_debug is not None ):
# ): # XXX: this is ``trio.Process.aclose()`` minus
# # XXX: this is ``trio.Process.aclose()`` MINUS the # the std-streams pre-closing steps and ``Process.kill()``
# # std-streams pre-closing steps inside ``proc.__aexit__()`` # calls.
# # (see below) which incluses a ``Process.kill()`` call try:
await proc.wait()
finally:
if proc.returncode is None:
# XXX: skip this when in debug and a session might
# still be live
# proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
else:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
# log.error( # NOTE: This ``__aexit__()`` shields internally.
# "Root process tty is locked in debug mode by " async with proc: # calls ``trio.Process.aclose()``
# f"{_global_actor_in_debug}. If the console is hanging, you " log.debug(f"Terminating {proc}")
# "may need to trigger a KBI to kill any "
# "not-fully-initialized" " subprocesses and allow errors "
# "from `trio` to propagate"
# )
# try:
# # one more graceful wait try can can be cancelled by KBI
# # sent by user.
# await proc.wait()
# finally: if cs.cancelled_caught:
# if proc.returncode is None: log.critical(f"HARD KILLING {proc}")
# # with trio.CancelScope(shield=True): proc.kill()
# # await proc.wait()
# await do_hard_kill(proc)
# else:
await do_hard_kill(proc)
async def new_proc( async def new_proc(

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from . import _debug from . import _debug
from ._state import current_actor, is_main_process, is_root_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
@ -170,25 +170,16 @@ class ActorNursery:
log.warning(f"Cancelling nursery in {self._actor.uid}") log.warning(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
# things? I'm thinking latter.
if hard_kill: if hard_kill:
proc.terminate() proc.terminate()
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event = self._actor._peer_connected[subactor.uid]
log.warning( log.warning(
f"{subactor.uid} wasn't finished spawning?") f"{subactor.uid} wasn't finished spawning?")
await event.wait() await event.wait()
# channel/portal should now be up # channel/portal should now be up
_, _, portal = self._children[subactor.uid] _, _, portal = self._children[subactor.uid]
@ -248,7 +239,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery: async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery( anursery = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,
@ -259,53 +249,21 @@ async def _open_and_supervise_one_cancels_all_nursery(
# spawning of actors happens in the caller's scope # spawning of actors happens in the caller's scope
# after we yield upwards # after we yield upwards
yield anursery yield anursery
log.debug(
log.runtime(
f"Waiting on subactors {anursery._children} " f"Waiting on subactors {anursery._children} "
"to complete" "to complete"
) )
# Last bit before first nursery block ends in the case # Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope # where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set() anursery._join_procs.set()
except BaseException as err: except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if is_root_process():
log.exception(f"we're root with {err}")
# wait to see if a sub-actor task
# will be scheduled and grab the tty
# lock on the next tick
# await trio.testing.wait_all_tasks_blocked()
debug_complete = _debug._no_remote_has_tty
if (
debug_complete and
not debug_complete.is_set()
):
log.warning(
'Root has errored but pdb is in use by '
f'child {_debug._global_actor_in_debug}\n'
'Waiting on tty lock to release..')
with trio.CancelScope(shield=True):
await debug_complete.wait()
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
anursery._join_procs.set() anursery._join_procs.set()
try: try:
# XXX: hypothetically an error could be # XXX: hypothetically an error could be
# raised and then a cancel signal shows up # raised and then a cancel signal shows up
@ -344,15 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# ria_nursery scope end # ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well? # XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery except (Exception, trio.MultiError, trio.Cancelled) as err:
except (
Exception,
trio.MultiError,
trio.Cancelled
) as err:
# If actor-local error was raised while waiting on # If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all # ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy: # remaining sub-actors (due to our lone strategy:
@ -419,12 +369,26 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
# try: try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
except (Exception, trio.MultiError, trio.Cancelled):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
if not _debug._pdb_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
await _debug._pdb_complete.wait()
raise
else: # sub-nursery case else: # sub-nursery case
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(

View File

@ -29,20 +29,19 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = { LEVELS = {
'GARBAGE': 1, 'GARBAGE': 1,
'TRANSPORT': 5, 'TRACE': 5,
'RUNTIME': 15, 'PROFILE': 15,
'PDB': 500, 'RUNTIME': 500,
'QUIET': 1000, 'QUIET': 1000,
} }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'PDB': 'white', 'RUNTIME': 'white',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'RUNTIME': 'white',
'DEBUG': 'white', 'DEBUG': 'white',
'TRANSPORT': 'cyan', 'TRACE': 'cyan',
'GARBAGE': 'blue', 'GARBAGE': 'blue',
} }
BOLD_PALETTE = { BOLD_PALETTE = {
@ -77,7 +76,7 @@ def get_logger(
# additional levels # additional levels
for name, val in LEVELS.items(): for name, val in LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ex. create ``logger.runtime()`` # ex. create ``logger.trace()``
setattr(logger, name.lower(), partial(logger.log, val)) setattr(logger, name.lower(), partial(logger.log, val))
return logger return logger