Compare commits
	
		
			10 Commits 
		
	
	
		
			bc468d9140
			...
			e1f128a79c
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						e1f128a79c | |
| 
							
							
								 | 
						d85f4fda57 | |
| 
							
							
								 | 
						52f135d85d | |
| 
							
							
								 | 
						b29d2f7053 | |
| 
							
							
								 | 
						5960330413 | |
| 
							
							
								 | 
						727a2084d6 | |
| 
							
							
								 | 
						f943ea0119 | |
| 
							
							
								 | 
						cb2d2ed9d5 | |
| 
							
							
								 | 
						7b2543512c | |
| 
							
							
								 | 
						7d41492f53 | 
| 
						 | 
				
			
			@ -52,9 +52,14 @@ async def assert_state(value: bool):
 | 
			
		|||
    assert _state == value
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_simple_contex():
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'error_parent',
 | 
			
		||||
    [False, True],
 | 
			
		||||
)
 | 
			
		||||
def test_simple_context(error_parent):
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
            portal = await n.start_actor(
 | 
			
		||||
| 
						 | 
				
			
			@ -74,10 +79,19 @@ def test_simple_contex():
 | 
			
		|||
            # after cancellation
 | 
			
		||||
            await portal.run(assert_state, value=False)
 | 
			
		||||
 | 
			
		||||
            if error_parent:
 | 
			
		||||
                raise ValueError
 | 
			
		||||
 | 
			
		||||
            # shut down daemon
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
    if error_parent:
 | 
			
		||||
        try:
 | 
			
		||||
            trio.run(main)
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            pass
 | 
			
		||||
    else:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -140,3 +140,81 @@ def test_dynamic_pub_sub():
 | 
			
		|||
        trio.run(main)
 | 
			
		||||
    except trio.TooSlowError:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def one_task_streams_and_one_handles_reqresp(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
 | 
			
		||||
    async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
        async def pingpong():
 | 
			
		||||
            '''Run a simple req/response service.
 | 
			
		||||
 | 
			
		||||
            '''
 | 
			
		||||
            async for msg in stream:
 | 
			
		||||
                print('rpc server ping')
 | 
			
		||||
                assert msg == 'ping'
 | 
			
		||||
                print('rpc server pong')
 | 
			
		||||
                await stream.send('pong')
 | 
			
		||||
 | 
			
		||||
        async with trio.open_nursery() as n:
 | 
			
		||||
            n.start_soon(pingpong)
 | 
			
		||||
 | 
			
		||||
            for _ in itertools.count():
 | 
			
		||||
                await stream.send('yo')
 | 
			
		||||
                await trio.sleep(0.01)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_reqresp_ontopof_streaming():
 | 
			
		||||
    '''Test a subactor that both streams with one task and
 | 
			
		||||
    spawns another which handles a small requests-response
 | 
			
		||||
    dialogue over the same bidir-stream.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        with trio.move_on_after(2):
 | 
			
		||||
            async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
                # name of this actor will be same as target func
 | 
			
		||||
                portal = await n.start_actor(
 | 
			
		||||
                    'dual_tasks',
 | 
			
		||||
                    enable_modules=[__name__]
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # flat to make sure we get at least one pong
 | 
			
		||||
                got_pong: bool = False
 | 
			
		||||
 | 
			
		||||
                async with portal.open_context(
 | 
			
		||||
                    one_task_streams_and_one_handles_reqresp,
 | 
			
		||||
 | 
			
		||||
                ) as (ctx, first):
 | 
			
		||||
 | 
			
		||||
                    assert first is None
 | 
			
		||||
 | 
			
		||||
                    async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
                        await stream.send('ping')
 | 
			
		||||
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
                            print(f'client received: {msg}')
 | 
			
		||||
 | 
			
		||||
                            assert msg in {'pong', 'yo'}
 | 
			
		||||
 | 
			
		||||
                            if msg == 'pong':
 | 
			
		||||
                                got_pong = True
 | 
			
		||||
                                await stream.send('ping')
 | 
			
		||||
                                print('client sent ping')
 | 
			
		||||
 | 
			
		||||
        assert got_pong
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
    except trio.TooSlowError:
 | 
			
		||||
        pass
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -479,7 +479,7 @@ class Actor:
 | 
			
		|||
        try:
 | 
			
		||||
            send_chan, recv_chan = self._cids2qs[(actorid, cid)]
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            send_chan, recv_chan = trio.open_memory_channel(1000)
 | 
			
		||||
            send_chan, recv_chan = trio.open_memory_channel(2*10)
 | 
			
		||||
            send_chan.cid = cid  # type: ignore
 | 
			
		||||
            recv_chan.cid = cid  # type: ignore
 | 
			
		||||
            self._cids2qs[(actorid, cid)] = send_chan, recv_chan
 | 
			
		||||
| 
						 | 
				
			
			@ -528,11 +528,14 @@ class Actor:
 | 
			
		|||
                task_status.started(loop_cs)
 | 
			
		||||
                async for msg in chan:
 | 
			
		||||
                    if msg is None:  # loop terminate sentinel
 | 
			
		||||
 | 
			
		||||
                        log.debug(
 | 
			
		||||
                            f"Cancelling all tasks for {chan} from {chan.uid}")
 | 
			
		||||
                        for (channel, cid) in self._rpc_tasks:
 | 
			
		||||
 | 
			
		||||
                        for (channel, cid) in self._rpc_tasks.copy():
 | 
			
		||||
                            if channel is chan:
 | 
			
		||||
                                await self._cancel_task(cid, channel)
 | 
			
		||||
 | 
			
		||||
                        log.debug(
 | 
			
		||||
                                f"Msg loop signalled to terminate for"
 | 
			
		||||
                                f" {chan} from {chan.uid}")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -359,6 +359,7 @@ class Portal:
 | 
			
		|||
        fn_mod_path, fn_name = func_deats(func)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        recv_chan: trio.ReceiveMemoryChannel = None
 | 
			
		||||
        try:
 | 
			
		||||
            cid, recv_chan, functype, first_msg = await self._submit(
 | 
			
		||||
                fn_mod_path, fn_name, kwargs)
 | 
			
		||||
| 
						 | 
				
			
			@ -390,7 +391,8 @@ class Portal:
 | 
			
		|||
                await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            await recv_chan.aclose()
 | 
			
		||||
            if recv_chan is not None:
 | 
			
		||||
                await recv_chan.aclose()
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
class LocalPortal:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -339,6 +339,7 @@ async def new_proc(
 | 
			
		|||
            bind_addr=bind_addr,
 | 
			
		||||
            parent_addr=parent_addr,
 | 
			
		||||
            _runtime_vars=_runtime_vars,
 | 
			
		||||
            infect_asyncio=infect_asyncio,
 | 
			
		||||
            task_status=task_status,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -354,6 +355,7 @@ async def mp_new_proc(
 | 
			
		|||
    parent_addr: Tuple[str, int],
 | 
			
		||||
    _runtime_vars: Dict[str, Any],  # serialized and sent to _child
 | 
			
		||||
    *,
 | 
			
		||||
    infect_asyncio: bool = False,
 | 
			
		||||
    task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,7 +12,7 @@ import trio
 | 
			
		|||
from async_generator import asynccontextmanager
 | 
			
		||||
 | 
			
		||||
from . import _debug
 | 
			
		||||
from ._state import current_actor, is_main_process
 | 
			
		||||
from ._state import current_actor, is_main_process, is_root_process
 | 
			
		||||
from .log import get_logger, get_loglevel
 | 
			
		||||
from ._actor import Actor
 | 
			
		||||
from ._portal import Portal
 | 
			
		||||
| 
						 | 
				
			
			@ -263,6 +263,26 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
                        "to complete"
 | 
			
		||||
                    )
 | 
			
		||||
                except BaseException as err:
 | 
			
		||||
 | 
			
		||||
                    if is_root_process() and (
 | 
			
		||||
                        type(err) in {
 | 
			
		||||
                            Exception, trio.MultiError, trio.Cancelled
 | 
			
		||||
                        }
 | 
			
		||||
                    ):
 | 
			
		||||
                        # if we error in the root but the debugger is
 | 
			
		||||
                        # engaged we don't want to prematurely kill (and
 | 
			
		||||
                        # thus clobber access to) the local tty streams.
 | 
			
		||||
                        # instead try to wait for pdb to be released before
 | 
			
		||||
                        # tearing down.
 | 
			
		||||
                        debug_complete = _debug._pdb_complete
 | 
			
		||||
                        if debug_complete and not debug_complete.is_set():
 | 
			
		||||
                            log.warning(
 | 
			
		||||
                                "Root has errored but pdb is active..waiting "
 | 
			
		||||
                                "on debug lock")
 | 
			
		||||
                            await _debug._pdb_complete.wait()
 | 
			
		||||
 | 
			
		||||
                        # raise
 | 
			
		||||
 | 
			
		||||
                    # if the caller's scope errored then we activate our
 | 
			
		||||
                    # one-cancels-all supervisor strategy (don't
 | 
			
		||||
                    # worry more are coming).
 | 
			
		||||
| 
						 | 
				
			
			@ -377,26 +397,11 @@ async def open_nursery(
 | 
			
		|||
            async with open_root_actor(**kwargs) as actor:
 | 
			
		||||
                assert actor is current_actor()
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    async with _open_and_supervise_one_cancels_all_nursery(
 | 
			
		||||
                        actor
 | 
			
		||||
                    ) as anursery:
 | 
			
		||||
                        yield anursery
 | 
			
		||||
 | 
			
		||||
                except (Exception, trio.MultiError, trio.Cancelled):
 | 
			
		||||
                    # if we error in the root but the debugger is
 | 
			
		||||
                    # engaged we don't want to prematurely kill (and
 | 
			
		||||
                    # thus clobber access to) the local tty streams.
 | 
			
		||||
                    # instead try to wait for pdb to be released before
 | 
			
		||||
                    # tearing down.
 | 
			
		||||
                    if not _debug._pdb_complete.is_set():
 | 
			
		||||
                        log.warning(
 | 
			
		||||
                            "Root has errored but pdb is active..waiting "
 | 
			
		||||
                            "on debug lock")
 | 
			
		||||
                        with trio.CancelScope(shield=True):
 | 
			
		||||
                            await _debug._pdb_complete.wait()
 | 
			
		||||
 | 
			
		||||
                    raise
 | 
			
		||||
                # try:
 | 
			
		||||
                async with _open_and_supervise_one_cancels_all_nursery(
 | 
			
		||||
                    actor
 | 
			
		||||
                ) as anursery:
 | 
			
		||||
                    yield anursery
 | 
			
		||||
 | 
			
		||||
        else:  # sub-nursery case
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -101,14 +101,16 @@ def _run_asyncio_task(
 | 
			
		|||
        """Cancel the calling ``trio`` task on error.
 | 
			
		||||
        """
 | 
			
		||||
        nonlocal aio_err
 | 
			
		||||
        aio_err = task.exception()
 | 
			
		||||
        try:
 | 
			
		||||
            aio_err = task.exception()
 | 
			
		||||
        except asyncio.CancelledError as cerr:
 | 
			
		||||
            aio_err = cerr
 | 
			
		||||
 | 
			
		||||
        if aio_err:
 | 
			
		||||
            log.exception(f"asyncio task errorred:\n{aio_err}")
 | 
			
		||||
 | 
			
		||||
        # cancel_scope.cancel()
 | 
			
		||||
        from_aio._err = aio_err
 | 
			
		||||
        to_trio.close()
 | 
			
		||||
 | 
			
		||||
    task.add_done_callback(cancel_trio)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -233,17 +235,25 @@ async def run_task(
 | 
			
		|||
        #         raise aio_err
 | 
			
		||||
 | 
			
		||||
    # Do we need this?
 | 
			
		||||
    except BaseException as err:
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
        # await tractor.breakpoint()
 | 
			
		||||
        aio_err = from_aio._err
 | 
			
		||||
 | 
			
		||||
        # try:
 | 
			
		||||
        if aio_err is not None:
 | 
			
		||||
            # always raise from any captured asyncio error
 | 
			
		||||
            raise err from aio_err
 | 
			
		||||
        else:
 | 
			
		||||
            raise
 | 
			
		||||
        # finally:
 | 
			
		||||
        #     if not task.done():
 | 
			
		||||
        #         task.cancel()
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        task.cancel()
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        if not task.done():
 | 
			
		||||
            task.cancel()
 | 
			
		||||
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def stream_from_task
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue