forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

23 Commits

Author SHA1 Message Date
Tyler Goodlet 00c4d69bd2 Change trace to transport level 2021-07-05 16:42:40 -04:00
Tyler Goodlet bd3832e70f Flip "trace" level to "transport" level logging 2021-07-05 16:42:40 -04:00
Tyler Goodlet af61d93bad Add fast fail test using the context api 2021-07-05 16:42:40 -04:00
Tyler Goodlet 1bca5c8144 Adjust debug tests to accomodate no more root clobbering
We may get multiple re-entries to debugger by `bp_forever` sub-actor
now since the root will incrementally try to cancel it only when the tty
lock is not held.
2021-07-05 16:42:40 -04:00
Tyler Goodlet c174a59758 Go back to only logging tbs on no debugger 2021-07-05 16:42:40 -04:00
Tyler Goodlet 91c70cad24 Comment hard-kill-sidestep for now since nursery version covers it? 2021-07-05 16:42:40 -04:00
Tyler Goodlet 9b4f7a3076 Go back to only logging crashes if no pdb gets engaged 2021-07-05 16:42:38 -04:00
Tyler Goodlet 3179eded73 Solve the root-cancels-child-in-tty-lock race
Finally this makes a cancelled root actor nursery not clobber child
tasks which request and lock the root's tty for the debugger repl.

Using an edge triggered event which is set after all fifo-lock-queued
tasks are complete, we can be sure that no lingering child tasks are
going to get interrupted during pdb use and tty lock acquisition.
Further, even if new tasks do queue up to get the lock, the root will
incrementally send cancel msgs to each sub-actor only once the tty is
not locked by a (set of) child request task(s). Add shielding around all
the critical sections where the child attempts to allocate the lock from
the root such that it won't be disrupted from cancel messages from the
root after the acquire lock transaction has started.
2021-07-05 16:40:58 -04:00
Tyler Goodlet b603904d3e Distinguish between a local pdb unlock and the tty unlock in root 2021-07-05 16:40:58 -04:00
Tyler Goodlet 1ec1743c48 Fix hard kill in debug mode; only do it when debug lock is empty 2021-07-05 16:40:58 -04:00
Tyler Goodlet 3412d344e2 Move some infos to runtime level 2021-07-05 16:40:58 -04:00
Tyler Goodlet 780476541a Add PDB level and make runtime below info but above debug 2021-07-05 16:40:58 -04:00
Tyler Goodlet ef4dbdcf85 Move debugger wait inside OCA nursery 2021-07-05 16:40:58 -04:00
Tyler Goodlet da9a33bf37 Don't shield debugger status wait; it causes hangs 2021-07-05 16:40:58 -04:00
Tyler Goodlet 11f8d14e38 Catch and delay errors in the root if debugger is active 2021-07-05 16:40:58 -04:00
Tyler Goodlet 65aa1f3e2a Don't shield on root cancel it can causes hangs 2021-07-05 16:40:58 -04:00
Tyler Goodlet 625fc87410 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-07-05 16:40:58 -04:00
Tyler Goodlet 8cc41090dd Add debug example that causes pdb stdin clobbering 2021-07-05 16:40:58 -04:00
Tyler Goodlet cb1e208599 Add some brief todo notes on idea of shielded breakpoint 2021-07-05 16:39:37 -04:00
Tyler Goodlet 53d25b58a1 Wait for debugger lock task context termination 2021-07-05 16:39:37 -04:00
Tyler Goodlet 9729f66672 Avoid mutate on iterate race 2021-07-05 16:39:37 -04:00
Tyler Goodlet a5848effa4 Fix up var naming and typing 2021-07-05 16:39:37 -04:00
Tyler Goodlet f59bb1aaf1 Use context for remote debugger locking
A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.
2021-07-05 16:39:37 -04:00
10 changed files with 590 additions and 170 deletions

View File

@ -0,0 +1,53 @@
'''
fast fail test with a context.
ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent
nrusery.
'''
import trio
import tractor
@tractor.context
async def sleep(
ctx: tractor.Context,
):
await trio.sleep(0.5)
await ctx.started()
await trio.sleep_forever()
async def open_ctx(
n: tractor._trionics.ActorNursery
):
# spawn both actors
portal = await n.start_actor(
name='sleeper',
enable_modules=[__name__],
)
async with portal.open_context(
sleep,
) as (ctx, first):
assert first is None
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='runtime',
) as an:
async with trio.open_nursery() as n:
n.start_soon(open_ctx, an)
await trio.sleep(0.2)
await trio.sleep(0.1)
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,31 @@
import trio
import tractor
async def key_error():
"Raise a ``NameError``"
return {}['doggy']
async def main():
"""Root dies
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='debug'
) as n:
# spawn both actors
portal = await n.run_in_actor(key_error)
# XXX: originally a bug causes by this
# where root would enter debugger even
# though child should have it locked.
with trio.fail_after(1):
await trio.Event().wait()
if __name__ == '__main__':
trio.run(main)

View File

@ -309,32 +309,58 @@ def test_multi_daemon_subactors(spawn, loglevel):
next_msg = name_error_msg
elif name_error_msg in before:
next_msg = None
next_msg = bp_forever_msg
else:
raise ValueError("Neither log msg was found !?")
child.sendline('c')
# NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger
# tearing down since both child actors would be cancelled and it was
# unlikely that `bp_forever` would re-acquire the tty loack again.
# Now, we should have a final resumption in the root plus a possible
# second entry by `bp_forever`.
# first name_error failure
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
if next_msg:
assert next_msg in before
child.sendline('c')
# XXX: hoorayy the root clobering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
# now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered
# ``_debug._no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B)
# at some point here there should have been some warning msg from
# the root announcing it avoided a clobber of the child's lock, but
# it seems unreliable in testing here to gnab it:
# assert "in use by child ('bp_forever'," in before
# wait for final error in root
while True:
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
try:
# root error should be packed as remote error
assert "_exceptions.RemoteActorError: ('name_error'" in before
break
except AssertionError:
assert bp_forever_msg in before
try:
child.sendline('c')
child.expect(pexpect.EOF)
except pexpect.exceptions.TIMEOUT:
# Failed to exit using continue..?
child.sendline('q')
child.expect(pexpect.EOF)
@ -389,7 +415,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child = spawn('multi_nested_subactors_error_up_through_nurseries')
# startup time can be iffy
time.sleep(1)
# time.sleep(1)
for i in range(12):
try:
@ -471,3 +497,21 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
assert "NameError: name 'doggypants' is not defined" in before
def test_root_cancels_child_context_during_startup(
spawn,
):
'''Verify a fast fail in the root doesn't lock up the child reaping
and all while using the new context api.
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "AssertionError" in before
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -28,6 +28,7 @@ from ._exceptions import (
ModuleNotExposed,
is_multi_cancelled,
TransportClosed,
ContextCancelled,
)
from . import _debug
from ._discovery import get_arbiter
@ -125,18 +126,32 @@ async def _invoke(
except (Exception, trio.MultiError) as err:
# TODO: maybe we'll want differnet "levels" of debugging
if not is_multi_cancelled(err):
log.exception("Actor crashed:")
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and (
not is_multi_cancelled(err)
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False
if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
):
# XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting
# disconnects as a default?
#
# I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in `trio` itself?
entered = await _debug._maybe_enter_pm(err)
if not entered:
# in our transport layer itself? Going to keep this
# open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception("Actor crashed:")
# always ship errors back to caller
@ -369,7 +384,8 @@ class Actor:
log.warning(
f"already have channel(s) for {uid}:{chans}?"
)
log.trace(f"Registered {chan} for {uid}") # type: ignore
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel
self._peers[uid].append(chan)
@ -489,17 +505,20 @@ class Actor:
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan:
await self._cancel_task(cid, channel)
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.trace( # type: ignore
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')

View File

@ -1,13 +1,13 @@
"""
Multi-core debugging for da peeps!
"""
import bdb
import sys
from functools import partial
from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
from typing import Tuple, Optional, Callable, AsyncIterator
from async_generator import aclosing
import tractor
import trio
@ -31,14 +31,22 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: wrap all these in a static global class: ``DebugLock`` maybe?
# placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger
_in_debug = False
_local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
_global_actor_in_debug: Optional[Tuple[str, str]] = None
# lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock()
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_local_pdb_complete: Optional[trio.Event] = None
_no_remote_has_tty: Optional[trio.Event] = None
# XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message
@ -61,19 +69,19 @@ class PdbwTeardown(pdbpp.Pdb):
# TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us.
def set_continue(self):
global _in_debug
try:
super().set_continue()
finally:
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook()
def set_quit(self):
global _in_debug
try:
super().set_quit()
finally:
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook()
@ -102,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}")
# log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)
@ -116,20 +124,71 @@ class PdbwTeardown(pdbpp.Pdb):
@asynccontextmanager
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes.
"""
'''Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering a global root process.
'''
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
task_name = trio.lowlevel.current_task().name
log.pdb(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try:
log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
await _debug_lock.acquire()
# we_acquired = True
_global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield
# NOTE: critical section!
# this yield is unshielded.
# IF we received a cancel during the shielded lock
# entry of some next-in-queue requesting task,
# then the resumption here will result in that
# Cancelled being raised to our caller below!
# in this case the finally below should trigger
# and the surrounding calle side context should cancel
# normally relaying back to the caller.
yield _debug_lock
finally:
# if _global_actor_in_debug == uid:
if we_acquired and _debug_lock.locked():
_debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that
# we are now back in the "tty unlocked" state. This is basically
# and edge triggered signal around an empty queue of sub-actor
# tasks that may have tried to acquire the lock.
stats = _debug_lock.statistics()
if (
not stats.owner
):
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set()
_no_remote_has_tty = None
_global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
@ -144,78 +203,142 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
# signal.signal(signal.SIGINT, prior_handler)
@tractor.context
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]:
# TODO: when we get to true remote debugging
# this will deliver stdin data
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid):
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# with _disable_sigint():
ctx: tractor.Context,
subactor_uid: Tuple[str, str]
) -> str:
'''Hijack the tty in the root process of an actor tree such that
the pdbpp debugger console can be allocated to a sub-actor for repl
bossing.
'''
task_name = trio.lowlevel.current_task().name
# TODO: when we get to true remote debugging
# this will deliver stdin data?
log.debug(
"Attempting to acquire TTY lock\n"
f"remote task: {task_name}:{subactor_uid}"
)
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
with trio.CancelScope(shield=True):
async with _acquire_debug_lock(subactor_uid):
# indicate to child that we've locked stdio
yield 'Locked'
await ctx.started('Locked')
log.pdb( # type: ignore
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for cancellation of stream by child
# indicating debugger is dis-engaged
await trio.sleep_forever()
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
try:
assert await stream.receive() == 'pdb_unlock'
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
except trio.BrokenResourceError:
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore
# the alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
pass
log.debug(
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
return "pdb_unlock_complete"
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
async def _breakpoint(
debug_func,
# TODO:
# shield: bool = False
) -> None:
'''``tractor`` breakpoint entry for engaging pdb machinery
in the root or a subactor.
'''
# TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
actor = tractor.current_actor()
do_unlock = trio.Event()
task_name = trio.lowlevel.current_task().name
global _local_pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint()
async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED
):
global _debugger_request_cs
with trio.CancelScope() as cs:
with trio.CancelScope(shield=True) as cs:
_debugger_request_cs = cs
try:
async with get_root() as portal:
async with portal.open_stream_from(
log.error('got portal')
# this syncs to child's ``Context.started()`` call.
async with portal.open_context(
tractor._debug._hijack_stdin_relay_to_child,
subactor_uid=actor.uid,
) as stream:
# block until first yield above
async for val in stream:
) as (ctx, val):
log.error('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
log.error('opened stream')
# unblock local caller
task_status.started()
# with trio.CancelScope(shield=True):
await do_unlock.wait()
try:
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
# trigger cancellation of remote stream
break
finally:
log.debug(f"Exiting debugger for actor {actor}")
global _in_debug
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock")
async def _bp():
"""Async breakpoint which schedules a parent stdio lock, and once complete
enters the ``pdbpp`` debugging console.
"""
task_name = trio.lowlevel.current_task().name
global _in_debug
if not _local_pdb_complete or _local_pdb_complete.is_set():
_local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process():
if _in_debug:
if _in_debug == task_name:
if _local_task_in_debug:
if _local_task_in_debug == task_name:
# this task already has the lock and is
# likely recurrently entering a breakpoint
return
@ -223,40 +346,71 @@ def _breakpoint(debug_func) -> Awaitable[None]:
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(
f"Actor {actor.uid} already has a debug lock, waiting...")
await do_unlock.wait()
await trio.sleep(0.1)
log.warning(f"{actor.uid} already has a debug lock, waiting...")
# assign unlock callback for debugger teardown hooks
global _pdb_release_hook
_pdb_release_hook = do_unlock.set
await _local_pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
_in_debug = task_name
_local_task_in_debug = task_name
# assign unlock callback for debugger teardown hooks
_pdb_release_hook = _local_pdb_complete.set
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# NOTE: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
with trio.CancelScope(shield=True):
await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
global _debug_lock
# TODO: wait, what about multiple root tasks acquiring
# it though.. shrug?
# root process (us) already has it; ignore
if _global_actor_in_debug == actor.uid:
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if _debug_lock.locked():
log.warning(
'Root actor attempting to acquire active tty lock'
f' owned by {_global_actor_in_debug}')
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
_global_actor_in_debug = actor.uid
_local_task_in_debug = task_name
# the lock must be released on pdb completion
def teardown():
global _local_pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release()
_global_actor_in_debug = None
_local_task_in_debug = None
_local_pdb_complete.set()
_pdb_release_hook = teardown
# block here one (at the appropriate frame *up* where
# ``breakpoint()`` was awaited and begin handling stdio
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
# user code **must** await this!
return _bp()
def _mk_pdb():
# XXX: setting these flags on the pdb instance are absolutely
@ -276,7 +430,7 @@ def _set_trace(actor=None):
pdb = _mk_pdb()
if actor is not None:
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n")
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
pdb.set_trace(
# start 2 levels up in user code
@ -285,8 +439,8 @@ def _set_trace(actor=None):
else:
# we entered the global ``breakpoint()`` built-in from sync code
global _in_debug, _pdb_release_hook
_in_debug = 'sync'
global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = 'sync'
def nuttin():
pass
@ -306,7 +460,7 @@ breakpoint = partial(
def _post_mortem(actor):
log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb = _mk_pdb()
# custom Pdb post-mortem entry

View File

@ -1,5 +1,6 @@
"""
Inter-process comms abstractions
"""
import platform
import typing
@ -61,7 +62,6 @@ class MsgpackTCPStream:
use_list=False,
)
while True:
try:
data = await self.stream.receive_some(2**10)
@ -88,7 +88,7 @@ class MsgpackTCPStream:
else:
raise
log.trace(f"received {data}") # type: ignore
log.transport(f"received {data}") # type: ignore
if data == b'':
raise TransportClosed(
@ -169,6 +169,7 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self,
destaddr: Tuple[Any, ...] = None,
**kwargs
@ -180,13 +181,21 @@ class Channel:
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream
async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore
log.transport(f"send `{item}`") # type: ignore
assert self.msgstream
await self.msgstream.send(item)
@ -205,7 +214,8 @@ class Channel:
raise
async def aclose(self) -> None:
log.debug(
log.transport(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
@ -234,11 +244,11 @@ class Channel:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warning(
log.transport(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warning("Stream connection re-established!")
log.transport("Stream connection re-established!")
# run any reconnection sequence
on_recon = self._recon_seq
if on_recon:
@ -247,7 +257,7 @@ class Channel:
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warning(
log.transport(
f"Connection to {self.raddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)

View File

@ -171,15 +171,17 @@ async def open_root_actor(
yield actor
except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:")
await _debug._maybe_enter_pm(err)
# with trio.CancelScope(shield=True):
entered = await _debug._maybe_enter_pm(err)
if not entered:
logger.exception("Root actor crashed:")
# always re-raise
raise
finally:
logger.info("Shutting down root actor")
with trio.CancelScope(shield=True):
await actor.cancel()
finally:
_state._current_actor = None

View File

@ -22,7 +22,14 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._state import current_actor, is_main_process
from ._state import (
current_actor,
is_main_process,
is_root_process,
_runtime_vars,
)
from ._debug import _global_actor_in_debug
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
@ -141,13 +148,34 @@ async def cancel_on_completion(
)
else:
log.info(
log.runtime(
f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}")
# cancel the process now that we have a final result
await portal.cancel_actor()
async def do_hard_kill(
proc: trio.Process,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"HARD KILLING {proc}")
proc.kill()
@asynccontextmanager
async def spawn_subactor(
@ -180,26 +208,48 @@ async def spawn_subactor(
proc = await trio.open_process(spawn_cmd)
try:
yield proc
finally:
log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
# NOTE: we always "shield" join sub procs in
# the outer scope since no actor zombies are
# ever allowed. This ``__aexit__()`` also shields
# internally.
log.debug(f"Attempting to kill {proc}")
# if (
# is_root_process()
# NOTE: this timeout effectively does nothing right now since
# we are shielding the ``.wait()`` inside ``new_proc()`` which
# will pretty much never release until the process exits.
with trio.move_on_after(3) as cs:
async with proc:
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
log.critical(f"HARD KILLING {proc}")
proc.kill()
# # XXX: basically the pre-closing of stdstreams in a
# # root-processe's ``trio.Process.aclose()`` can clobber
# # any existing debugger session so we avoid
# and _runtime_vars['_debug_mode']
# and _global_actor_in_debug is not None
# ):
# # XXX: this is ``trio.Process.aclose()`` MINUS the
# # std-streams pre-closing steps inside ``proc.__aexit__()``
# # (see below) which incluses a ``Process.kill()`` call
# log.error(
# "Root process tty is locked in debug mode by "
# f"{_global_actor_in_debug}. If the console is hanging, you "
# "may need to trigger a KBI to kill any "
# "not-fully-initialized" " subprocesses and allow errors "
# "from `trio` to propagate"
# )
# try:
# # one more graceful wait try can can be cancelled by KBI
# # sent by user.
# await proc.wait()
# finally:
# if proc.returncode is None:
# # with trio.CancelScope(shield=True):
# # await proc.wait()
# await do_hard_kill(proc)
# else:
await do_hard_kill(proc)
async def new_proc(
@ -212,7 +262,6 @@ async def new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
@ -223,13 +272,13 @@ async def new_proc(
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
if use_trio_run_in_process or _spawn_method == 'trio':
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
parent_addr,
) as proc:
log.info(f"Started {proc}")
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
@ -277,9 +326,14 @@ async def new_proc(
# reaping more stringently without the shield
# we used to have below...
# always "hard" join sub procs:
# no actor zombies allowed
# with trio.CancelScope(shield=True):
# async with proc:
# Always "hard" join sub procs since no actor zombies
# are allowed!
# this is a "light" (cancellable) join, the hard join is
# in the enclosing scope (see above).
await proc.wait()
log.debug(f"Joined {proc}")
@ -320,7 +374,6 @@ async def mp_new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -380,7 +433,7 @@ async def mp_new_proc(
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
log.runtime(f"Started {proc}")
try:
# wait for actor to spawn and connect back to us

View File

@ -11,7 +11,8 @@ import warnings
import trio
from async_generator import asynccontextmanager
from ._state import current_actor, is_main_process
from . import _debug
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
@ -169,16 +170,25 @@ class ActorNursery:
log.warning(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
# things? I'm thinking latter.
if hard_kill:
proc.terminate()
else:
if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid]
log.warning(
f"{subactor.uid} wasn't finished spawning?")
await event.wait()
# channel/portal should now be up
_, _, portal = self._children[subactor.uid]
@ -238,6 +248,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
@ -248,15 +259,53 @@ async def _open_and_supervise_one_cancels_all_nursery(
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
log.runtime(
f"Waiting on subactors {anursery._children} "
"to complete"
)
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set()
except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if is_root_process():
log.exception(f"we're root with {err}")
# wait to see if a sub-actor task
# will be scheduled and grab the tty
# lock on the next tick
# await trio.testing.wait_all_tasks_blocked()
debug_complete = _debug._no_remote_has_tty
if (
debug_complete and
not debug_complete.is_set()
):
log.warning(
'Root has errored but pdb is in use by '
f'child {_debug._global_actor_in_debug}\n'
'Waiting on tty lock to release..')
with trio.CancelScope(shield=True):
await debug_complete.wait()
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
@ -292,15 +341,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# this is the catch around the ``.run_in_actor()`` nursery
except (
Exception,
trio.MultiError,
trio.Cancelled
) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
@ -366,6 +418,7 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
# try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:

View File

@ -29,19 +29,20 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = {
'GARBAGE': 1,
'TRACE': 5,
'PROFILE': 15,
'RUNTIME': 500,
'TRANSPORT': 5,
'RUNTIME': 15,
'PDB': 500,
'QUIET': 1000,
}
STD_PALETTE = {
'CRITICAL': 'red',
'ERROR': 'red',
'RUNTIME': 'white',
'PDB': 'white',
'WARNING': 'yellow',
'INFO': 'green',
'RUNTIME': 'white',
'DEBUG': 'white',
'TRACE': 'cyan',
'TRANSPORT': 'cyan',
'GARBAGE': 'blue',
}
BOLD_PALETTE = {
@ -76,7 +77,7 @@ def get_logger(
# additional levels
for name, val in LEVELS.items():
logging.addLevelName(val, name)
# ex. create ``logger.trace()``
# ex. create ``logger.runtime()``
setattr(logger, name.lower(), partial(logger.log, val))
return logger