Compare commits
No commits in common. "5d74490f1e4650783549637b3c10f377454066a6" and "38c22dd82b456e17724f96ce4d85d48c205102d6" have entirely different histories.
5d74490f1e
...
38c22dd82b
|
@ -1,53 +0,0 @@
|
||||||
'''
|
|
||||||
fast fail test with a context.
|
|
||||||
ensure the partially initialized sub-actor process
|
|
||||||
doesn't cause a hang on error/cancel of the parent
|
|
||||||
nrusery.
|
|
||||||
|
|
||||||
'''
|
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def sleep(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
):
|
|
||||||
await trio.sleep(0.5)
|
|
||||||
await ctx.started()
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
|
|
||||||
async def open_ctx(
|
|
||||||
n: tractor._trionics.ActorNursery
|
|
||||||
):
|
|
||||||
|
|
||||||
# spawn both actors
|
|
||||||
portal = await n.start_actor(
|
|
||||||
name='sleeper',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
|
|
||||||
async with portal.open_context(
|
|
||||||
sleep,
|
|
||||||
) as (ctx, first):
|
|
||||||
assert first is None
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=True,
|
|
||||||
loglevel='runtime',
|
|
||||||
) as an:
|
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
|
||||||
n.start_soon(open_ctx, an)
|
|
||||||
|
|
||||||
await trio.sleep(0.2)
|
|
||||||
await trio.sleep(0.1)
|
|
||||||
assert 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
trio.run(main)
|
|
|
@ -317,58 +317,32 @@ def test_multi_daemon_subactors(spawn, loglevel):
|
||||||
next_msg = name_error_msg
|
next_msg = name_error_msg
|
||||||
|
|
||||||
elif name_error_msg in before:
|
elif name_error_msg in before:
|
||||||
next_msg = bp_forever_msg
|
next_msg = None
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("Neither log msg was found !?")
|
raise ValueError("Neither log msg was found !?")
|
||||||
|
|
||||||
# NOTE: previously since we did not have clobber prevention
|
|
||||||
# in the root actor this final resume could result in the debugger
|
|
||||||
# tearing down since both child actors would be cancelled and it was
|
|
||||||
# unlikely that `bp_forever` would re-acquire the tty loack again.
|
|
||||||
# Now, we should have a final resumption in the root plus a possible
|
|
||||||
# second entry by `bp_forever`.
|
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
|
# first name_error failure
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
|
if next_msg:
|
||||||
assert next_msg in before
|
assert next_msg in before
|
||||||
|
|
||||||
# XXX: hoorayy the root clobering the child here was fixed!
|
|
||||||
# IMO, this demonstrates the true power of SC system design.
|
|
||||||
|
|
||||||
# now the root actor won't clobber the bp_forever child
|
|
||||||
# during it's first access to the debug lock, but will instead
|
|
||||||
# wait for the lock to release, by the edge triggered
|
|
||||||
# ``_debug._no_remote_has_tty`` event before sending cancel messages
|
|
||||||
# (via portals) to its underlings B)
|
|
||||||
|
|
||||||
# at some point here there should have been some warning msg from
|
|
||||||
# the root announcing it avoided a clobber of the child's lock, but
|
|
||||||
# it seems unreliable in testing here to gnab it:
|
|
||||||
# assert "in use by child ('bp_forever'," in before
|
|
||||||
|
|
||||||
# wait for final error in root
|
|
||||||
while True:
|
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
try:
|
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
||||||
|
|
||||||
# root error should be packed as remote error
|
|
||||||
assert "_exceptions.RemoteActorError: ('name_error'" in before
|
|
||||||
break
|
|
||||||
|
|
||||||
except AssertionError:
|
|
||||||
assert bp_forever_msg in before
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
except pexpect.exceptions.TIMEOUT:
|
except pexpect.exceptions.TIMEOUT:
|
||||||
|
|
||||||
# Failed to exit using continue..?
|
# Failed to exit using continue..?
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
@ -423,7 +397,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
# startup time can be iffy
|
# startup time can be iffy
|
||||||
# time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
for i in range(12):
|
for i in range(12):
|
||||||
try:
|
try:
|
||||||
|
@ -505,21 +479,3 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
|
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
|
||||||
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
||||||
assert "NameError: name 'doggypants' is not defined" in before
|
assert "NameError: name 'doggypants' is not defined" in before
|
||||||
|
|
||||||
|
|
||||||
def test_root_cancels_child_context_during_startup(
|
|
||||||
spawn,
|
|
||||||
):
|
|
||||||
'''Verify a fast fail in the root doesn't lock up the child reaping
|
|
||||||
and all while using the new context api.
|
|
||||||
|
|
||||||
'''
|
|
||||||
child = spawn('fast_error_in_root_after_spawn')
|
|
||||||
|
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
|
||||||
|
|
||||||
before = str(child.before.decode())
|
|
||||||
assert "AssertionError" in before
|
|
||||||
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(pexpect.EOF)
|
|
||||||
|
|
|
@ -463,8 +463,7 @@ class Actor:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"already have channel(s) for {uid}:{chans}?"
|
f"already have channel(s) for {uid}:{chans}?"
|
||||||
)
|
)
|
||||||
|
log.trace(f"Registered {chan} for {uid}") # type: ignore
|
||||||
log.runtime(f"Registered {chan} for {uid}") # type: ignore
|
|
||||||
# append new channel
|
# append new channel
|
||||||
self._peers[uid].append(chan)
|
self._peers[uid].append(chan)
|
||||||
|
|
||||||
|
@ -632,7 +631,7 @@ class Actor:
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
log.transport( # type: ignore
|
log.trace( # type: ignore
|
||||||
f"Received msg {msg} from {chan.uid}")
|
f"Received msg {msg} from {chan.uid}")
|
||||||
|
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
|
|
|
@ -45,8 +45,7 @@ _global_actor_in_debug: Optional[Tuple[str, str]] = None
|
||||||
|
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
_local_pdb_complete: Optional[trio.Event] = None
|
_pdb_complete: Optional[trio.Event] = None
|
||||||
_no_remote_has_tty: Optional[trio.Event] = None
|
|
||||||
|
|
||||||
# XXX: set by the current task waiting on the root tty lock
|
# XXX: set by the current task waiting on the root tty lock
|
||||||
# and must be cancelled if this actor is cancelled via message
|
# and must be cancelled if this actor is cancelled via message
|
||||||
|
@ -110,7 +109,7 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
|
|
||||||
# async with aclosing(async_stdin):
|
# async with aclosing(async_stdin):
|
||||||
# async for msg in async_stdin:
|
# async for msg in async_stdin:
|
||||||
# log.runtime(f"Stdin input:\n{msg}")
|
# log.trace(f"Stdin input:\n{msg}")
|
||||||
# # encode to bytes
|
# # encode to bytes
|
||||||
# bmsg = str.encode(msg)
|
# bmsg = str.encode(msg)
|
||||||
|
|
||||||
|
@ -124,71 +123,24 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
'''Acquire a actor local FIFO lock meant to mutex entry to a local
|
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
||||||
debugger entry point to avoid tty clobbering a global root process.
|
debugger entry point to avoid tty clobbering by multiple processes.
|
||||||
|
"""
|
||||||
'''
|
global _debug_lock, _global_actor_in_debug
|
||||||
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
|
|
||||||
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
log.pdb(
|
|
||||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
|
||||||
)
|
|
||||||
|
|
||||||
we_acquired = False
|
|
||||||
|
|
||||||
if _no_remote_has_tty is None:
|
|
||||||
# mark the tty lock as being in use so that the runtime
|
|
||||||
# can try to avoid clobbering any connection from a child
|
|
||||||
# that's currently relying on it.
|
|
||||||
_no_remote_has_tty = trio.Event()
|
|
||||||
|
|
||||||
try:
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
|
||||||
)
|
|
||||||
we_acquired = True
|
|
||||||
await _debug_lock.acquire()
|
|
||||||
|
|
||||||
# we_acquired = True
|
async with _debug_lock:
|
||||||
|
|
||||||
|
# _debug_lock._uid = uid
|
||||||
_global_actor_in_debug = uid
|
_global_actor_in_debug = uid
|
||||||
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
|
yield
|
||||||
# NOTE: critical section!
|
|
||||||
# this yield is unshielded.
|
|
||||||
# IF we received a cancel during the shielded lock
|
|
||||||
# entry of some next-in-queue requesting task,
|
|
||||||
# then the resumption here will result in that
|
|
||||||
# Cancelled being raised to our caller below!
|
|
||||||
|
|
||||||
# in this case the finally below should trigger
|
|
||||||
# and the surrounding calle side context should cancel
|
|
||||||
# normally relaying back to the caller.
|
|
||||||
|
|
||||||
yield _debug_lock
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# if _global_actor_in_debug == uid:
|
|
||||||
if we_acquired and _debug_lock.locked():
|
|
||||||
_debug_lock.release()
|
|
||||||
|
|
||||||
# IFF there are no more requesting tasks queued up fire, the
|
|
||||||
# "tty-unlocked" event thereby alerting any monitors of the lock that
|
|
||||||
# we are now back in the "tty unlocked" state. This is basically
|
|
||||||
# and edge triggered signal around an empty queue of sub-actor
|
|
||||||
# tasks that may have tried to acquire the lock.
|
|
||||||
stats = _debug_lock.statistics()
|
|
||||||
if (
|
|
||||||
not stats.owner
|
|
||||||
):
|
|
||||||
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
|
|
||||||
_no_remote_has_tty.set()
|
|
||||||
_no_remote_has_tty = None
|
|
||||||
|
|
||||||
_global_actor_in_debug = None
|
_global_actor_in_debug = None
|
||||||
|
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
|
@ -210,30 +162,29 @@ async def _hijack_stdin_relay_to_child(
|
||||||
subactor_uid: Tuple[str, str]
|
subactor_uid: Tuple[str, str]
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''Hijack the tty in the root process of an actor tree such that
|
|
||||||
the pdbpp debugger console can be allocated to a sub-actor for repl
|
|
||||||
bossing.
|
|
||||||
|
|
||||||
'''
|
global _pdb_complete
|
||||||
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data?
|
# this will deliver stdin data?
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Attempting to acquire TTY lock\n"
|
"Attempting to acquire TTY lock, "
|
||||||
f"remote task: {task_name}:{subactor_uid}"
|
f"remote task: {task_name}:{subactor_uid}"
|
||||||
)
|
)
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
|
|
||||||
async with _acquire_debug_lock(subactor_uid):
|
async with _acquire_debug_lock(subactor_uid):
|
||||||
|
|
||||||
|
# XXX: only shield the context sync step!
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
await ctx.started('Locked')
|
await ctx.started('Locked')
|
||||||
log.pdb( # type: ignore
|
log.runtime( # type: ignore
|
||||||
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
||||||
|
|
||||||
# wait for unlock pdb by child
|
# wait for unlock pdb by child
|
||||||
|
@ -252,6 +203,7 @@ async def _hijack_stdin_relay_to_child(
|
||||||
log.debug(
|
log.debug(
|
||||||
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||||
|
|
||||||
|
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
||||||
return "pdb_unlock_complete"
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
|
|
||||||
|
@ -263,24 +215,20 @@ async def _breakpoint(debug_func) -> None:
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
global _local_pdb_complete, _pdb_release_hook
|
global _pdb_complete, _pdb_release_hook
|
||||||
global _local_task_in_debug, _global_actor_in_debug
|
global _local_task_in_debug, _global_actor_in_debug
|
||||||
|
|
||||||
await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
global _debugger_request_cs
|
global _debugger_request_cs
|
||||||
|
|
||||||
with trio.CancelScope(shield=True) as cs:
|
with trio.CancelScope() as cs:
|
||||||
_debugger_request_cs = cs
|
_debugger_request_cs = cs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
|
|
||||||
log.error('got portal')
|
|
||||||
|
|
||||||
# this syncs to child's ``Context.started()`` call.
|
# this syncs to child's ``Context.started()`` call.
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
|
|
||||||
|
@ -289,21 +237,17 @@ async def _breakpoint(debug_func) -> None:
|
||||||
|
|
||||||
) as (ctx, val):
|
) as (ctx, val):
|
||||||
|
|
||||||
log.error('locked context')
|
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
log.error('opened stream')
|
|
||||||
# unblock local caller
|
# unblock local caller
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
try:
|
|
||||||
await _local_pdb_complete.wait()
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# TODO: shielding currently can cause hangs...
|
# TODO: shielding currently can cause hangs...
|
||||||
with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
|
await _pdb_complete.wait()
|
||||||
await stream.send('pdb_unlock')
|
await stream.send('pdb_unlock')
|
||||||
|
|
||||||
# sync with callee termination
|
# sync with callee termination
|
||||||
|
@ -318,12 +262,11 @@ async def _breakpoint(debug_func) -> None:
|
||||||
_local_task_in_debug = None
|
_local_task_in_debug = None
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
log.debug(f"Child {actor} released parent stdio lock")
|
||||||
|
|
||||||
if not _local_pdb_complete or _local_pdb_complete.is_set():
|
if not _pdb_complete or _pdb_complete.is_set():
|
||||||
_local_pdb_complete = trio.Event()
|
_pdb_complete = trio.Event()
|
||||||
|
|
||||||
# TODO: need a more robust check for the "root" actor
|
# TODO: need a more robust check for the "root" actor
|
||||||
if actor._parent_chan and not is_root_process():
|
if actor._parent_chan and not is_root_process():
|
||||||
|
|
||||||
if _local_task_in_debug:
|
if _local_task_in_debug:
|
||||||
if _local_task_in_debug == task_name:
|
if _local_task_in_debug == task_name:
|
||||||
# this task already has the lock and is
|
# this task already has the lock and is
|
||||||
|
@ -335,7 +278,7 @@ async def _breakpoint(debug_func) -> None:
|
||||||
# support for recursive entries to `tractor.breakpoint()`
|
# support for recursive entries to `tractor.breakpoint()`
|
||||||
log.warning(f"{actor.uid} already has a debug lock, waiting...")
|
log.warning(f"{actor.uid} already has a debug lock, waiting...")
|
||||||
|
|
||||||
await _local_pdb_complete.wait()
|
await _pdb_complete.wait()
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
# mark local actor as "in debug mode" to avoid recurrent
|
# mark local actor as "in debug mode" to avoid recurrent
|
||||||
|
@ -343,17 +286,11 @@ async def _breakpoint(debug_func) -> None:
|
||||||
_local_task_in_debug = task_name
|
_local_task_in_debug = task_name
|
||||||
|
|
||||||
# assign unlock callback for debugger teardown hooks
|
# assign unlock callback for debugger teardown hooks
|
||||||
_pdb_release_hook = _local_pdb_complete.set
|
_pdb_release_hook = _pdb_complete.set
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
# root nursery so that the debugger can continue to run without
|
# root nursery so that the debugger can continue to run without
|
||||||
# being restricted by the scope of a new task nursery.
|
# being restricted by the scope of a new task nursery.
|
||||||
|
|
||||||
# NOTE: if we want to debug a trio.Cancelled triggered exception
|
|
||||||
# we have to figure out how to avoid having the service nursery
|
|
||||||
# cancel on this task start? I *think* this works below?
|
|
||||||
# actor._service_n.cancel_scope.shield = shield
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
|
@ -371,11 +308,6 @@ async def _breakpoint(debug_func) -> None:
|
||||||
# XXX: since we need to enter pdb synchronously below,
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
# we have to release the lock manually from pdb completion
|
# we have to release the lock manually from pdb completion
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
if _debug_lock.locked():
|
|
||||||
log.warning(
|
|
||||||
'Root actor attempting to acquire active tty lock'
|
|
||||||
f' owned by {_global_actor_in_debug}')
|
|
||||||
|
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
_global_actor_in_debug = actor.uid
|
_global_actor_in_debug = actor.uid
|
||||||
|
@ -383,13 +315,13 @@ async def _breakpoint(debug_func) -> None:
|
||||||
|
|
||||||
# the lock must be released on pdb completion
|
# the lock must be released on pdb completion
|
||||||
def teardown():
|
def teardown():
|
||||||
global _local_pdb_complete, _debug_lock
|
global _pdb_complete, _debug_lock
|
||||||
global _global_actor_in_debug, _local_task_in_debug
|
global _global_actor_in_debug, _local_task_in_debug
|
||||||
|
|
||||||
_debug_lock.release()
|
_debug_lock.release()
|
||||||
_global_actor_in_debug = None
|
_global_actor_in_debug = None
|
||||||
_local_task_in_debug = None
|
_local_task_in_debug = None
|
||||||
_local_pdb_complete.set()
|
_pdb_complete.set()
|
||||||
|
|
||||||
_pdb_release_hook = teardown
|
_pdb_release_hook = teardown
|
||||||
|
|
||||||
|
@ -417,7 +349,7 @@ def _set_trace(actor=None):
|
||||||
pdb = _mk_pdb()
|
pdb = _mk_pdb()
|
||||||
|
|
||||||
if actor is not None:
|
if actor is not None:
|
||||||
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
|
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
|
||||||
|
|
||||||
pdb.set_trace(
|
pdb.set_trace(
|
||||||
# start 2 levels up in user code
|
# start 2 levels up in user code
|
||||||
|
@ -447,7 +379,7 @@ breakpoint = partial(
|
||||||
|
|
||||||
|
|
||||||
def _post_mortem(actor):
|
def _post_mortem(actor):
|
||||||
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
||||||
pdb = _mk_pdb()
|
pdb = _mk_pdb()
|
||||||
|
|
||||||
# custom Pdb post-mortem entry
|
# custom Pdb post-mortem entry
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
"""
|
"""
|
||||||
Inter-process comms abstractions
|
Inter-process comms abstractions
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import platform
|
import platform
|
||||||
import typing
|
import typing
|
||||||
|
@ -62,6 +61,7 @@ class MsgpackTCPStream:
|
||||||
use_list=False,
|
use_list=False,
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = await self.stream.receive_some(2**10)
|
data = await self.stream.receive_some(2**10)
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ class MsgpackTCPStream:
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
log.transport(f"received {data}") # type: ignore
|
log.trace(f"received {data}") # type: ignore
|
||||||
|
|
||||||
if data == b'':
|
if data == b'':
|
||||||
raise TransportClosed(
|
raise TransportClosed(
|
||||||
|
@ -169,7 +169,6 @@ class Channel:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
destaddr: Tuple[Any, ...] = None,
|
destaddr: Tuple[Any, ...] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
|
@ -181,21 +180,13 @@ class Channel:
|
||||||
|
|
||||||
destaddr = destaddr or self._destaddr
|
destaddr = destaddr or self._destaddr
|
||||||
assert isinstance(destaddr, tuple)
|
assert isinstance(destaddr, tuple)
|
||||||
|
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||||
stream = await trio.open_tcp_stream(
|
|
||||||
*destaddr,
|
|
||||||
**kwargs
|
|
||||||
)
|
|
||||||
self.msgstream = MsgpackTCPStream(stream)
|
self.msgstream = MsgpackTCPStream(stream)
|
||||||
|
|
||||||
log.transport(
|
|
||||||
f'Opened channel to peer {self.laddr} -> {self.raddr}'
|
|
||||||
)
|
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def send(self, item: Any) -> None:
|
async def send(self, item: Any) -> None:
|
||||||
|
|
||||||
log.transport(f"send `{item}`") # type: ignore
|
log.trace(f"send `{item}`") # type: ignore
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
|
|
||||||
await self.msgstream.send(item)
|
await self.msgstream.send(item)
|
||||||
|
@ -214,8 +205,7 @@ class Channel:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
log.debug(
|
||||||
log.transport(
|
|
||||||
f'Closing channel to {self.uid} '
|
f'Closing channel to {self.uid} '
|
||||||
f'{self.laddr} -> {self.raddr}'
|
f'{self.laddr} -> {self.raddr}'
|
||||||
)
|
)
|
||||||
|
@ -244,11 +234,11 @@ class Channel:
|
||||||
await self.connect()
|
await self.connect()
|
||||||
cancelled = cancel_scope.cancelled_caught
|
cancelled = cancel_scope.cancelled_caught
|
||||||
if cancelled:
|
if cancelled:
|
||||||
log.transport(
|
log.warning(
|
||||||
"Reconnect timed out after 3 seconds, retrying...")
|
"Reconnect timed out after 3 seconds, retrying...")
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.transport("Stream connection re-established!")
|
log.warning("Stream connection re-established!")
|
||||||
# run any reconnection sequence
|
# run any reconnection sequence
|
||||||
on_recon = self._recon_seq
|
on_recon = self._recon_seq
|
||||||
if on_recon:
|
if on_recon:
|
||||||
|
@ -257,7 +247,7 @@ class Channel:
|
||||||
except (OSError, ConnectionRefusedError):
|
except (OSError, ConnectionRefusedError):
|
||||||
if not down:
|
if not down:
|
||||||
down = True
|
down = True
|
||||||
log.transport(
|
log.warning(
|
||||||
f"Connection to {self.raddr} went down, waiting"
|
f"Connection to {self.raddr} went down, waiting"
|
||||||
" for re-establishment")
|
" for re-establishment")
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
|
|
@ -171,11 +171,8 @@ async def open_root_actor(
|
||||||
yield actor
|
yield actor
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# with trio.CancelScope(shield=True):
|
logger.exception("Actor crashed:")
|
||||||
entered = await _debug._maybe_enter_pm(err)
|
await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if not entered:
|
|
||||||
logger.exception("Root actor crashed:")
|
|
||||||
|
|
||||||
# always re-raise
|
# always re-raise
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -28,7 +28,6 @@ from ._state import (
|
||||||
is_root_process,
|
is_root_process,
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from ._debug import _global_actor_in_debug
|
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -155,27 +154,6 @@ async def cancel_on_completion(
|
||||||
# cancel the process now that we have a final result
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
async def do_hard_kill(
|
|
||||||
proc: trio.Process,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
# NOTE: this timeout used to do nothing since we were shielding
|
|
||||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
|
||||||
# never release until the process exits, now it acts as
|
|
||||||
# a hard-kill time ultimatum.
|
|
||||||
with trio.move_on_after(3) as cs:
|
|
||||||
|
|
||||||
# NOTE: This ``__aexit__()`` shields internally.
|
|
||||||
async with proc: # calls ``trio.Process.aclose()``
|
|
||||||
log.debug(f"Terminating {proc}")
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
# XXX: should pretty much never get here unless we have
|
|
||||||
# to move the bits from ``proc.__aexit__()`` out and
|
|
||||||
# into here.
|
|
||||||
log.critical(f"HARD KILLING {proc}")
|
|
||||||
proc.kill()
|
|
||||||
|
|
||||||
|
|
||||||
async def do_hard_kill(
|
async def do_hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
|
@ -231,46 +209,46 @@ async def spawn_subactor(
|
||||||
yield proc
|
yield proc
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.runtime(f"Attempting to kill {proc}")
|
log.debug(f"Attempting to kill {proc}")
|
||||||
|
|
||||||
# XXX: do this **after** cancellation/tearfown
|
# XXX: do this **after** cancellation/tearfown
|
||||||
# to avoid killing the process too early
|
# to avoid killing the process too early
|
||||||
# since trio does this internally on ``__aexit__()``
|
# since trio does this internally on ``__aexit__()``
|
||||||
|
|
||||||
# if (
|
if (
|
||||||
# is_root_process()
|
is_root_process()
|
||||||
|
|
||||||
# # XXX: basically the pre-closing of stdstreams in a
|
# XXX: basically the pre-closing of stdstreams in a
|
||||||
# # root-processe's ``trio.Process.aclose()`` can clobber
|
# root-processe's ``trio.Process.aclose()`` can clobber
|
||||||
# # any existing debugger session so we avoid
|
# any existing debugger session so we avoid
|
||||||
# and _runtime_vars['_debug_mode']
|
and _runtime_vars['_debug_mode']
|
||||||
# and _global_actor_in_debug is not None
|
):
|
||||||
# ):
|
# XXX: this is ``trio.Process.aclose()`` minus
|
||||||
# # XXX: this is ``trio.Process.aclose()`` MINUS the
|
# the std-streams pre-closing steps and ``Process.kill()``
|
||||||
# # std-streams pre-closing steps inside ``proc.__aexit__()``
|
# calls.
|
||||||
# # (see below) which incluses a ``Process.kill()`` call
|
try:
|
||||||
|
await proc.wait()
|
||||||
|
finally:
|
||||||
|
if proc.returncode is None:
|
||||||
|
# XXX: skip this when in debug and a session might
|
||||||
|
# still be live
|
||||||
|
# proc.kill()
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await proc.wait()
|
||||||
|
else:
|
||||||
|
# NOTE: this timeout used to do nothing since we were shielding
|
||||||
|
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||||
|
# never release until the process exits, now it acts as
|
||||||
|
# a hard-kill time ultimatum.
|
||||||
|
with trio.move_on_after(3) as cs:
|
||||||
|
|
||||||
# log.error(
|
# NOTE: This ``__aexit__()`` shields internally.
|
||||||
# "Root process tty is locked in debug mode by "
|
async with proc: # calls ``trio.Process.aclose()``
|
||||||
# f"{_global_actor_in_debug}. If the console is hanging, you "
|
log.debug(f"Terminating {proc}")
|
||||||
# "may need to trigger a KBI to kill any "
|
|
||||||
# "not-fully-initialized" " subprocesses and allow errors "
|
|
||||||
# "from `trio` to propagate"
|
|
||||||
# )
|
|
||||||
# try:
|
|
||||||
# # one more graceful wait try can can be cancelled by KBI
|
|
||||||
# # sent by user.
|
|
||||||
# await proc.wait()
|
|
||||||
|
|
||||||
# finally:
|
if cs.cancelled_caught:
|
||||||
# if proc.returncode is None:
|
log.critical(f"HARD KILLING {proc}")
|
||||||
# # with trio.CancelScope(shield=True):
|
proc.kill()
|
||||||
# # await proc.wait()
|
|
||||||
|
|
||||||
# await do_hard_kill(proc)
|
|
||||||
# else:
|
|
||||||
|
|
||||||
await do_hard_kill(proc)
|
|
||||||
|
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._state import current_actor, is_main_process, is_root_process
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -170,25 +170,16 @@ class ActorNursery:
|
||||||
|
|
||||||
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
|
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in self._children.values():
|
||||||
|
|
||||||
# TODO: are we ever even going to use this or
|
|
||||||
# is the spawning backend responsible for such
|
|
||||||
# things? I'm thinking latter.
|
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} wasn't finished spawning?")
|
f"{subactor.uid} wasn't finished spawning?")
|
||||||
|
|
||||||
await event.wait()
|
await event.wait()
|
||||||
|
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
|
||||||
|
@ -248,7 +239,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# As such if the strategy propagates any error(s) upwards
|
||||||
# the above "daemon actor" nursery will be notified.
|
# the above "daemon actor" nursery will be notified.
|
||||||
async with trio.open_nursery() as ria_nursery:
|
async with trio.open_nursery() as ria_nursery:
|
||||||
|
|
||||||
anursery = ActorNursery(
|
anursery = ActorNursery(
|
||||||
actor,
|
actor,
|
||||||
ria_nursery,
|
ria_nursery,
|
||||||
|
@ -259,53 +249,21 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# spawning of actors happens in the caller's scope
|
# spawning of actors happens in the caller's scope
|
||||||
# after we yield upwards
|
# after we yield upwards
|
||||||
yield anursery
|
yield anursery
|
||||||
|
log.debug(
|
||||||
log.runtime(
|
|
||||||
f"Waiting on subactors {anursery._children} "
|
f"Waiting on subactors {anursery._children} "
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Last bit before first nursery block ends in the case
|
# Last bit before first nursery block ends in the case
|
||||||
# where we didn't error in the caller's scope
|
# where we didn't error in the caller's scope
|
||||||
|
log.debug("Waiting on all subactors to complete")
|
||||||
# signal all process monitor tasks to conduct
|
|
||||||
# hard join phase.
|
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
|
||||||
# engaged we don't want to prematurely kill (and
|
|
||||||
# thus clobber access to) the local tty since it
|
|
||||||
# will make the pdb repl unusable.
|
|
||||||
# Instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
if is_root_process():
|
|
||||||
log.exception(f"we're root with {err}")
|
|
||||||
|
|
||||||
# wait to see if a sub-actor task
|
|
||||||
# will be scheduled and grab the tty
|
|
||||||
# lock on the next tick
|
|
||||||
# await trio.testing.wait_all_tasks_blocked()
|
|
||||||
|
|
||||||
debug_complete = _debug._no_remote_has_tty
|
|
||||||
if (
|
|
||||||
debug_complete and
|
|
||||||
not debug_complete.is_set()
|
|
||||||
):
|
|
||||||
log.warning(
|
|
||||||
'Root has errored but pdb is in use by '
|
|
||||||
f'child {_debug._global_actor_in_debug}\n'
|
|
||||||
'Waiting on tty lock to release..')
|
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await debug_complete.wait()
|
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# XXX: hypothetically an error could be
|
# XXX: hypothetically an error could be
|
||||||
# raised and then a cancel signal shows up
|
# raised and then a cancel signal shows up
|
||||||
|
@ -344,15 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# ria_nursery scope end
|
# ria_nursery scope end
|
||||||
|
|
||||||
# XXX: do we need a `trio.Cancelled` catch here as well?
|
# XXX: do we need a `trio.Cancelled` catch here as well?
|
||||||
# this is the catch around the ``.run_in_actor()`` nursery
|
except (Exception, trio.MultiError, trio.Cancelled) as err:
|
||||||
except (
|
|
||||||
|
|
||||||
Exception,
|
|
||||||
trio.MultiError,
|
|
||||||
trio.Cancelled
|
|
||||||
|
|
||||||
) as err:
|
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
# If actor-local error was raised while waiting on
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
|
@ -419,12 +369,26 @@ async def open_nursery(
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
# try:
|
try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as anursery:
|
) as anursery:
|
||||||
yield anursery
|
yield anursery
|
||||||
|
|
||||||
|
except (Exception, trio.MultiError, trio.Cancelled):
|
||||||
|
# if we error in the root but the debugger is
|
||||||
|
# engaged we don't want to prematurely kill (and
|
||||||
|
# thus clobber access to) the local tty streams.
|
||||||
|
# instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
if not _debug._pdb_complete.is_set():
|
||||||
|
log.warning(
|
||||||
|
"Root has errored but pdb is active..waiting "
|
||||||
|
"on debug lock")
|
||||||
|
await _debug._pdb_complete.wait()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
else: # sub-nursery case
|
else: # sub-nursery case
|
||||||
|
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
|
@ -29,20 +29,19 @@ LOG_FORMAT = (
|
||||||
DATE_FORMAT = '%b %d %H:%M:%S'
|
DATE_FORMAT = '%b %d %H:%M:%S'
|
||||||
LEVELS = {
|
LEVELS = {
|
||||||
'GARBAGE': 1,
|
'GARBAGE': 1,
|
||||||
'TRANSPORT': 5,
|
'TRACE': 5,
|
||||||
'RUNTIME': 15,
|
'PROFILE': 15,
|
||||||
'PDB': 500,
|
'RUNTIME': 500,
|
||||||
'QUIET': 1000,
|
'QUIET': 1000,
|
||||||
}
|
}
|
||||||
STD_PALETTE = {
|
STD_PALETTE = {
|
||||||
'CRITICAL': 'red',
|
'CRITICAL': 'red',
|
||||||
'ERROR': 'red',
|
'ERROR': 'red',
|
||||||
'PDB': 'white',
|
'RUNTIME': 'white',
|
||||||
'WARNING': 'yellow',
|
'WARNING': 'yellow',
|
||||||
'INFO': 'green',
|
'INFO': 'green',
|
||||||
'RUNTIME': 'white',
|
|
||||||
'DEBUG': 'white',
|
'DEBUG': 'white',
|
||||||
'TRANSPORT': 'cyan',
|
'TRACE': 'cyan',
|
||||||
'GARBAGE': 'blue',
|
'GARBAGE': 'blue',
|
||||||
}
|
}
|
||||||
BOLD_PALETTE = {
|
BOLD_PALETTE = {
|
||||||
|
@ -77,7 +76,7 @@ def get_logger(
|
||||||
# additional levels
|
# additional levels
|
||||||
for name, val in LEVELS.items():
|
for name, val in LEVELS.items():
|
||||||
logging.addLevelName(val, name)
|
logging.addLevelName(val, name)
|
||||||
# ex. create ``logger.runtime()``
|
# ex. create ``logger.trace()``
|
||||||
setattr(logger, name.lower(), partial(logger.log, val))
|
setattr(logger, name.lower(), partial(logger.log, val))
|
||||||
|
|
||||||
return logger
|
return logger
|
||||||
|
|
Loading…
Reference in New Issue