Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet acd955714f Add fast fail test using the context api 2021-07-01 19:37:47 -04:00
Tyler Goodlet 7f84d9f048 Avoid mutate during interate error 2021-07-01 19:37:47 -04:00
Tyler Goodlet fc831dbfd6 Expect context cancelled when we cancel 2021-07-01 19:37:47 -04:00
Tyler Goodlet 084becc62f Adjust debug tests to accomodate no more root clobbering 2021-07-01 19:37:47 -04:00
Tyler Goodlet f37d46585c Add pre-stream open error conditions 2021-07-01 19:37:46 -04:00
Tyler Goodlet 669559380d Change trace to transport level 2021-07-01 19:37:45 -04:00
Tyler Goodlet a5d27ebcf5 Flip "trace" level to "transport" level logging 2021-07-01 19:37:24 -04:00
Tyler Goodlet 1084ca99bf Go back to only logging tbs on no debugger 2021-07-01 19:37:24 -04:00
Tyler Goodlet e4d6810623 De-densify some code 2021-07-01 19:37:24 -04:00
Tyler Goodlet 2556a568e7 Comment hard-kill-sidestep for now since nursery version covers it? 2021-07-01 19:37:24 -04:00
11 changed files with 187 additions and 74 deletions

View File

@ -0,0 +1,53 @@
'''
fast fail test with a context.
ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent
nrusery.
'''
import trio
import tractor
@tractor.context
async def sleep(
ctx: tractor.Context,
):
await trio.sleep(0.5)
await ctx.started()
await trio.sleep_forever()
async def open_ctx(
n: tractor._trionics.ActorNursery
):
# spawn both actors
portal = await n.start_actor(
name='sleeper',
enable_modules=[__name__],
)
async with portal.open_context(
sleep,
) as (ctx, first):
assert first is None
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='runtime',
) as an:
async with trio.open_nursery() as n:
n.start_soon(open_ctx, an)
await trio.sleep(0.2)
await trio.sleep(0.1)
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -262,7 +262,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
pass pass
except trio.ClosedResourceError: except tractor.ContextCancelled:
pass pass
else: else:
assert 0, "Should have received closed resource error?" assert 0, "Should have received closed resource error?"

View File

@ -30,7 +30,7 @@ async def publisher(
sub = 'even' if is_even(val) else 'odd' sub = 'even' if is_even(val) else 'odd'
for sub_stream in _registry[sub]: for sub_stream in _registry[sub].copy():
await sub_stream.send(val) await sub_stream.send(val)
# throttle send rate to ~1kHz # throttle send rate to ~1kHz

View File

@ -307,19 +307,46 @@ def test_multi_daemon_subactors(spawn, loglevel):
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError" in before assert "NameError" in before
child.sendline('c') # XXX: hoorayy the root clobering the child here was fixed!
# now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered
# ``_debug._no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B)
# IMO, this demonstrates the true power of SC system design.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'," in before
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
try:
# final error in root
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
except AssertionError:
# except pexpect.exceptions.TIMEOUT:
# one last entry in the root
child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
try: # theory there should have been some msg like this from
child.sendline('c') # root announcing it avoided a clobber of the child's lock,
child.expect(pexpect.EOF) # but it seems unreliable in testing here to gnab it.
except pexpect.exceptions.TIMEOUT: # assert "in use by child ('bp_forever'," in before
# Failed to exit using continue..?
child.sendline('q') child.sendline('c')
# final error in root
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
@ -372,7 +399,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
# startup time can be iffy # startup time can be iffy
time.sleep(1) # time.sleep(1)
for i in range(12): for i in range(12):
try: try:
@ -454,3 +481,21 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
def test_root_cancels_child_context_during_startup(
spawn,
):
'''Verify a fast fail in the root doesn't lock up the child reaping
and all while using the new context api.
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "AssertionError" in before
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -466,7 +466,7 @@ class Actor:
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -640,7 +640,7 @@ class Actor:
break break
log.trace( # type: ignore log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')

View File

@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin): # async with aclosing(async_stdin):
# async for msg in async_stdin: # async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}") # log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes # # encode to bytes
# bmsg = str.encode(msg) # bmsg = str.encode(msg)

View File

@ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars
@asynccontextmanager @asynccontextmanager
async def get_arbiter( async def get_arbiter(
host: str, host: str,
port: int, port: int,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote '''Return a portal instance connected to a local or remote
arbiter. arbiter.
""" '''
actor = current_actor() actor = current_actor()
if not actor: if not actor:
@ -33,16 +35,20 @@ async def get_arbiter(
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal: async with open_portal(chan) as arb_portal:
yield arb_portal yield arb_portal
@asynccontextmanager @asynccontextmanager
async def get_root( async def get_root(
**kwargs, **kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
host, port = _runtime_vars['_root_mailbox'] host, port = _runtime_vars['_root_mailbox']
assert host is not None assert host is not None
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal: async with open_portal(chan, **kwargs) as portal:
yield portal yield portal
@ -60,12 +66,16 @@ async def find_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
elif sockaddr: elif sockaddr:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
@ -83,9 +93,12 @@ async def wait_for_actor(
A portal to the first registered actor is returned. A portal to the first registered actor is returned.
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal

View File

@ -166,8 +166,11 @@ async def open_root_actor(
yield actor yield actor
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:") # with trio.CancelScope(shield=True):
await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)
if not entered:
logger.exception("Root actor crashed:")
# always re-raise # always re-raise
raise raise

View File

@ -157,6 +157,7 @@ async def cancel_on_completion(
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
) -> None: ) -> None:
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
@ -209,43 +210,45 @@ async def spawn_subactor(
yield proc yield proc
finally: finally:
log.debug(f"Attempting to kill {proc}") log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown # XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early # to avoid killing the process too early
# since trio does this internally on ``__aexit__()`` # since trio does this internally on ``__aexit__()``
if ( # if (
is_root_process() # is_root_process()
# XXX: basically the pre-closing of stdstreams in a # # XXX: basically the pre-closing of stdstreams in a
# root-processe's ``trio.Process.aclose()`` can clobber # # root-processe's ``trio.Process.aclose()`` can clobber
# any existing debugger session so we avoid # # any existing debugger session so we avoid
and _runtime_vars['_debug_mode'] # and _runtime_vars['_debug_mode']
and _global_actor_in_debug is not None # and _global_actor_in_debug is not None
): # ):
# XXX: this is ``trio.Process.aclose()`` MINUS the # # XXX: this is ``trio.Process.aclose()`` MINUS the
# std-streams pre-closing steps inside ``proc.__aexit__()`` # # std-streams pre-closing steps inside ``proc.__aexit__()``
# (see below) which incluses a ``Process.kill()`` call # # (see below) which incluses a ``Process.kill()`` call
log.critical( # log.error(
"Root process tty is locked in debug mode by " # "Root process tty is locked in debug mode by "
f"{_global_actor_in_debug}. If the console is hanging, you " # f"{_global_actor_in_debug}. If the console is hanging, you "
"may need to trigger a KBI to kill any " # "may need to trigger a KBI to kill any "
"not-fully-initialized" " subprocesses and allow errors " # "not-fully-initialized" " subprocesses and allow errors "
"from `trio` to propagate" # "from `trio` to propagate"
) # )
try: # try:
# one more graceful wait try can can be cancelled by KBI # # one more graceful wait try can can be cancelled by KBI
# sent by user. # # sent by user.
await proc.wait() # await proc.wait()
# finally:
# if proc.returncode is None:
# # with trio.CancelScope(shield=True):
# # await proc.wait()
# await do_hard_kill(proc)
# else:
finally:
if proc.returncode is None:
# with trio.CancelScope(shield=True):
await do_hard_kill(proc)
else:
# with trio.CancelScope(shield=True):
await do_hard_kill(proc) await do_hard_kill(proc)

View File

@ -15,7 +15,7 @@ import warnings
import trio import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor from ._state import current_actor
from .log import get_logger from .log import get_logger
@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise # propagate raise # propagate
# except trio.Cancelled:
# if not self._shielded:
# # if shielded we don't propagate a cancelled
# raise
# except trio.Cancelled:
# # relay cancels to the remote task
# await self.aclose()
# raise
@contextmanager @contextmanager
def shield( def shield(
self self
@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: # or self._eoc: if rx_chan._closed:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -440,19 +430,25 @@ class Context:
self.cid self.cid
) )
# XXX: If the underlying receive mem chan has been closed then # Likewise if the surrounding context has been cancelled we error here
# likely client code has already exited a ``.open_stream()`` # since it likely means the surrounding block was exited or
# block prior. we error here until such a time that we decide # killed
# allowing streams to be "re-connected" is supported and/or
# a good idea. if self._cancel_called:
if recv_chan._closed:
task = trio.lowlevel.current_task().name task = trio.lowlevel.current_task().name
raise trio.ClosedResourceError( raise ContextCancelled(
f'stream for {actor.uid[0]}:{task} has already been closed.' f'Context around {actor.uid[0]}:{task} was already cancelled!'
'\nRe-opening a closed stream is not yet supported!'
'\nConsider re-calling the containing `@tractor.context` func'
) )
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=recv_chan, rx_chan=recv_chan,

View File

@ -29,7 +29,7 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = { LEVELS = {
'GARBAGE': 1, 'GARBAGE': 1,
'TRACE': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'PDB': 500, 'PDB': 500,
'QUIET': 1000, 'QUIET': 1000,
@ -42,7 +42,7 @@ STD_PALETTE = {
'INFO': 'green', 'INFO': 'green',
'RUNTIME': 'white', 'RUNTIME': 'white',
'DEBUG': 'white', 'DEBUG': 'white',
'TRACE': 'cyan', 'TRANSPORT': 'cyan',
'GARBAGE': 'blue', 'GARBAGE': 'blue',
} }
BOLD_PALETTE = { BOLD_PALETTE = {
@ -77,7 +77,7 @@ def get_logger(
# additional levels # additional levels
for name, val in LEVELS.items(): for name, val in LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ex. create ``logger.trace()`` # ex. create ``logger.runtime()``
setattr(logger, name.lower(), partial(logger.log, val)) setattr(logger, name.lower(), partial(logger.log, val))
return logger return logger