Compare commits

..

No commits in common. "e1f128a79cd7d4487987ee1d50c96d47cabcbdc5" and "bc468d9140edf29f6c286e12a393b52c6477f057" have entirely different histories.

7 changed files with 31 additions and 145 deletions

View File

@ -52,14 +52,9 @@ async def assert_state(value: bool):
assert _state == value assert _state == value
@pytest.mark.parametrize( def test_simple_contex():
'error_parent',
[False, True],
)
def test_simple_context(error_parent):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
@ -79,19 +74,10 @@ def test_simple_context(error_parent):
# after cancellation # after cancellation
await portal.run(assert_state, value=False) await portal.run(assert_state, value=False)
if error_parent:
raise ValueError
# shut down daemon # shut down daemon
await portal.cancel_actor() await portal.cancel_actor()
if error_parent: trio.run(main)
try:
trio.run(main)
except ValueError:
pass
else:
trio.run(main)
@tractor.context @tractor.context

View File

@ -140,81 +140,3 @@ def test_dynamic_pub_sub():
trio.run(main) trio.run(main)
except trio.TooSlowError: except trio.TooSlowError:
pass pass
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
'''Test a subactor that both streams with one task and
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
with trio.move_on_after(2):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
# flat to make sure we get at least one pong
got_pong: bool = False
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass

View File

@ -479,7 +479,7 @@ class Actor:
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(2*10) send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -528,14 +528,11 @@ class Actor:
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")

View File

@ -359,7 +359,6 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
recv_chan: trio.ReceiveMemoryChannel = None
try: try:
cid, recv_chan, functype, first_msg = await self._submit( cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs) fn_mod_path, fn_name, kwargs)
@ -391,8 +390,7 @@ class Portal:
await ctx.cancel() await ctx.cancel()
finally: finally:
if recv_chan is not None: await recv_chan.aclose()
await recv_chan.aclose()
@dataclass @dataclass
class LocalPortal: class LocalPortal:

View File

@ -339,7 +339,6 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -355,7 +354,6 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from . import _debug from . import _debug
from ._state import current_actor, is_main_process, is_root_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
@ -263,26 +263,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
"to complete" "to complete"
) )
except BaseException as err: except BaseException as err:
if is_root_process() and (
type(err) in {
Exception, trio.MultiError, trio.Cancelled
}
):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
debug_complete = _debug._pdb_complete
if debug_complete and not debug_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
await _debug._pdb_complete.wait()
# raise
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
@ -397,11 +377,26 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
# try: try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
except (Exception, trio.MultiError, trio.Cancelled):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
if not _debug._pdb_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
with trio.CancelScope(shield=True):
await _debug._pdb_complete.wait()
raise
else: # sub-nursery case else: # sub-nursery case

View File

@ -101,16 +101,14 @@ def _run_asyncio_task(
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
nonlocal aio_err nonlocal aio_err
try: aio_err = task.exception()
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
@ -235,25 +233,17 @@ async def run_task(
# raise aio_err # raise aio_err
# Do we need this? # Do we need this?
except Exception as err: except BaseException as err:
# await tractor.breakpoint() # await tractor.breakpoint()
aio_err = from_aio._err aio_err = from_aio._err
# try:
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
# finally:
# if not task.done():
# task.cancel()
except trio.Cancelled: finally:
if not task.done(): task.cancel()
task.cancel()
raise
# async def stream_from_task # async def stream_from_task