Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet e1f128a79c Move debugger wait inside OCA nursery 2021-06-10 14:02:12 -04:00
Tyler Goodlet d85f4fda57 Add a multi-task streaming test 2021-06-10 14:00:09 -04:00
Tyler Goodlet 52f135d85d Avoid mutate on iterate race 2021-06-10 13:59:08 -04:00
Tyler Goodlet b29d2f7053 Only close recv chan if we get a ref 2021-06-10 13:58:06 -04:00
Tyler Goodlet 5960330413 Add error case 2021-06-10 13:57:16 -04:00
Tyler Goodlet 727a2084d6 Don't shield debugger status wait; it causes hangs 2021-06-02 08:24:59 -04:00
Tyler Goodlet f943ea0119 Drop bad .close() call 2021-06-02 08:22:51 -04:00
Tyler Goodlet cb2d2ed9d5 Proxy asyncio cancelleds as well 2021-05-31 09:29:45 -04:00
Tyler Goodlet 7b2543512c Power of 2 cuz puters 2021-05-31 09:29:45 -04:00
Tyler Goodlet 7d41492f53 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-05-31 09:29:45 -04:00
7 changed files with 145 additions and 31 deletions

View File

@ -52,9 +52,14 @@ async def assert_state(value: bool):
assert _state == value assert _state == value
def test_simple_contex(): @pytest.mark.parametrize(
'error_parent',
[False, True],
)
def test_simple_context(error_parent):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
@ -74,9 +79,18 @@ def test_simple_contex():
# after cancellation # after cancellation
await portal.run(assert_state, value=False) await portal.run(assert_state, value=False)
if error_parent:
raise ValueError
# shut down daemon # shut down daemon
await portal.cancel_actor() await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except ValueError:
pass
else:
trio.run(main) trio.run(main)

View File

@ -140,3 +140,81 @@ def test_dynamic_pub_sub():
trio.run(main) trio.run(main)
except trio.TooSlowError: except trio.TooSlowError:
pass pass
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
'''Test a subactor that both streams with one task and
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
with trio.move_on_after(2):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
# flat to make sure we get at least one pong
got_pong: bool = False
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass

View File

@ -479,7 +479,7 @@ class Actor:
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000) send_chan, recv_chan = trio.open_memory_channel(2*10)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -528,11 +528,14 @@ class Actor:
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")

View File

@ -359,6 +359,7 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
recv_chan: trio.ReceiveMemoryChannel = None
try: try:
cid, recv_chan, functype, first_msg = await self._submit( cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs) fn_mod_path, fn_name, kwargs)
@ -390,6 +391,7 @@ class Portal:
await ctx.cancel() await ctx.cancel()
finally: finally:
if recv_chan is not None:
await recv_chan.aclose() await recv_chan.aclose()
@dataclass @dataclass

View File

@ -339,6 +339,7 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -354,6 +355,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from . import _debug from . import _debug
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
@ -263,6 +263,26 @@ async def _open_and_supervise_one_cancels_all_nursery(
"to complete" "to complete"
) )
except BaseException as err: except BaseException as err:
if is_root_process() and (
type(err) in {
Exception, trio.MultiError, trio.Cancelled
}
):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
debug_complete = _debug._pdb_complete
if debug_complete and not debug_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
await _debug._pdb_complete.wait()
# raise
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
@ -377,27 +397,12 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
try: # try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
except (Exception, trio.MultiError, trio.Cancelled):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
if not _debug._pdb_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
with trio.CancelScope(shield=True):
await _debug._pdb_complete.wait()
raise
else: # sub-nursery case else: # sub-nursery case
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(

View File

@ -101,14 +101,16 @@ def _run_asyncio_task(
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
nonlocal aio_err nonlocal aio_err
try:
aio_err = task.exception() aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
@ -233,18 +235,26 @@ async def run_task(
# raise aio_err # raise aio_err
# Do we need this? # Do we need this?
except BaseException as err: except Exception as err:
# await tractor.breakpoint() # await tractor.breakpoint()
aio_err = from_aio._err aio_err = from_aio._err
# try:
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
# finally:
# if not task.done():
# task.cancel()
finally: except trio.Cancelled:
if not task.done():
task.cancel() task.cancel()
raise
# async def stream_from_task # async def stream_from_task
# pass # pass