Compare commits
	
		
			No commits in common. "e1f128a79cd7d4487987ee1d50c96d47cabcbdc5" and "bc468d9140edf29f6c286e12a393b52c6477f057" have entirely different histories. 
		
	
	
		
			e1f128a79c
			...
			bc468d9140
		
	
		| 
						 | 
					@ -52,14 +52,9 @@ async def assert_state(value: bool):
 | 
				
			||||||
    assert _state == value
 | 
					    assert _state == value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@pytest.mark.parametrize(
 | 
					def test_simple_contex():
 | 
				
			||||||
    'error_parent',
 | 
					 | 
				
			||||||
    [False, True],
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def test_simple_context(error_parent):
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
 | 
					 | 
				
			||||||
        async with tractor.open_nursery() as n:
 | 
					        async with tractor.open_nursery() as n:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            portal = await n.start_actor(
 | 
					            portal = await n.start_actor(
 | 
				
			||||||
| 
						 | 
					@ -79,18 +74,9 @@ def test_simple_context(error_parent):
 | 
				
			||||||
            # after cancellation
 | 
					            # after cancellation
 | 
				
			||||||
            await portal.run(assert_state, value=False)
 | 
					            await portal.run(assert_state, value=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if error_parent:
 | 
					 | 
				
			||||||
                raise ValueError
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # shut down daemon
 | 
					            # shut down daemon
 | 
				
			||||||
            await portal.cancel_actor()
 | 
					            await portal.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if error_parent:
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            trio.run(main)
 | 
					 | 
				
			||||||
        except ValueError:
 | 
					 | 
				
			||||||
            pass
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
    trio.run(main)
 | 
					    trio.run(main)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -140,81 +140,3 @@ def test_dynamic_pub_sub():
 | 
				
			||||||
        trio.run(main)
 | 
					        trio.run(main)
 | 
				
			||||||
    except trio.TooSlowError:
 | 
					    except trio.TooSlowError:
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@tractor.context
 | 
					 | 
				
			||||||
async def one_task_streams_and_one_handles_reqresp(
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ctx: tractor.Context,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    await ctx.started()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    async with ctx.open_stream() as stream:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        async def pingpong():
 | 
					 | 
				
			||||||
            '''Run a simple req/response service.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            '''
 | 
					 | 
				
			||||||
            async for msg in stream:
 | 
					 | 
				
			||||||
                print('rpc server ping')
 | 
					 | 
				
			||||||
                assert msg == 'ping'
 | 
					 | 
				
			||||||
                print('rpc server pong')
 | 
					 | 
				
			||||||
                await stream.send('pong')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        async with trio.open_nursery() as n:
 | 
					 | 
				
			||||||
            n.start_soon(pingpong)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for _ in itertools.count():
 | 
					 | 
				
			||||||
                await stream.send('yo')
 | 
					 | 
				
			||||||
                await trio.sleep(0.01)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def test_reqresp_ontopof_streaming():
 | 
					 | 
				
			||||||
    '''Test a subactor that both streams with one task and
 | 
					 | 
				
			||||||
    spawns another which handles a small requests-response
 | 
					 | 
				
			||||||
    dialogue over the same bidir-stream.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    async def main():
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with trio.move_on_after(2):
 | 
					 | 
				
			||||||
            async with tractor.open_nursery() as n:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # name of this actor will be same as target func
 | 
					 | 
				
			||||||
                portal = await n.start_actor(
 | 
					 | 
				
			||||||
                    'dual_tasks',
 | 
					 | 
				
			||||||
                    enable_modules=[__name__]
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # flat to make sure we get at least one pong
 | 
					 | 
				
			||||||
                got_pong: bool = False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                async with portal.open_context(
 | 
					 | 
				
			||||||
                    one_task_streams_and_one_handles_reqresp,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                ) as (ctx, first):
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    assert first is None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    async with ctx.open_stream() as stream:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        await stream.send('ping')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        async for msg in stream:
 | 
					 | 
				
			||||||
                            print(f'client received: {msg}')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                            assert msg in {'pong', 'yo'}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                            if msg == 'pong':
 | 
					 | 
				
			||||||
                                got_pong = True
 | 
					 | 
				
			||||||
                                await stream.send('ping')
 | 
					 | 
				
			||||||
                                print('client sent ping')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        assert got_pong
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        trio.run(main)
 | 
					 | 
				
			||||||
    except trio.TooSlowError:
 | 
					 | 
				
			||||||
        pass
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -479,7 +479,7 @@ class Actor:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            send_chan, recv_chan = self._cids2qs[(actorid, cid)]
 | 
					            send_chan, recv_chan = self._cids2qs[(actorid, cid)]
 | 
				
			||||||
        except KeyError:
 | 
					        except KeyError:
 | 
				
			||||||
            send_chan, recv_chan = trio.open_memory_channel(2*10)
 | 
					            send_chan, recv_chan = trio.open_memory_channel(1000)
 | 
				
			||||||
            send_chan.cid = cid  # type: ignore
 | 
					            send_chan.cid = cid  # type: ignore
 | 
				
			||||||
            recv_chan.cid = cid  # type: ignore
 | 
					            recv_chan.cid = cid  # type: ignore
 | 
				
			||||||
            self._cids2qs[(actorid, cid)] = send_chan, recv_chan
 | 
					            self._cids2qs[(actorid, cid)] = send_chan, recv_chan
 | 
				
			||||||
| 
						 | 
					@ -528,14 +528,11 @@ class Actor:
 | 
				
			||||||
                task_status.started(loop_cs)
 | 
					                task_status.started(loop_cs)
 | 
				
			||||||
                async for msg in chan:
 | 
					                async for msg in chan:
 | 
				
			||||||
                    if msg is None:  # loop terminate sentinel
 | 
					                    if msg is None:  # loop terminate sentinel
 | 
				
			||||||
 | 
					 | 
				
			||||||
                        log.debug(
 | 
					                        log.debug(
 | 
				
			||||||
                            f"Cancelling all tasks for {chan} from {chan.uid}")
 | 
					                            f"Cancelling all tasks for {chan} from {chan.uid}")
 | 
				
			||||||
 | 
					                        for (channel, cid) in self._rpc_tasks:
 | 
				
			||||||
                        for (channel, cid) in self._rpc_tasks.copy():
 | 
					 | 
				
			||||||
                            if channel is chan:
 | 
					                            if channel is chan:
 | 
				
			||||||
                                await self._cancel_task(cid, channel)
 | 
					                                await self._cancel_task(cid, channel)
 | 
				
			||||||
 | 
					 | 
				
			||||||
                        log.debug(
 | 
					                        log.debug(
 | 
				
			||||||
                                f"Msg loop signalled to terminate for"
 | 
					                                f"Msg loop signalled to terminate for"
 | 
				
			||||||
                                f" {chan} from {chan.uid}")
 | 
					                                f" {chan} from {chan.uid}")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -359,7 +359,6 @@ class Portal:
 | 
				
			||||||
        fn_mod_path, fn_name = func_deats(func)
 | 
					        fn_mod_path, fn_name = func_deats(func)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        recv_chan: trio.ReceiveMemoryChannel = None
 | 
					 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            cid, recv_chan, functype, first_msg = await self._submit(
 | 
					            cid, recv_chan, functype, first_msg = await self._submit(
 | 
				
			||||||
                fn_mod_path, fn_name, kwargs)
 | 
					                fn_mod_path, fn_name, kwargs)
 | 
				
			||||||
| 
						 | 
					@ -391,7 +390,6 @@ class Portal:
 | 
				
			||||||
                await ctx.cancel()
 | 
					                await ctx.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
            if recv_chan is not None:
 | 
					 | 
				
			||||||
            await recv_chan.aclose()
 | 
					            await recv_chan.aclose()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -339,7 +339,6 @@ async def new_proc(
 | 
				
			||||||
            bind_addr=bind_addr,
 | 
					            bind_addr=bind_addr,
 | 
				
			||||||
            parent_addr=parent_addr,
 | 
					            parent_addr=parent_addr,
 | 
				
			||||||
            _runtime_vars=_runtime_vars,
 | 
					            _runtime_vars=_runtime_vars,
 | 
				
			||||||
            infect_asyncio=infect_asyncio,
 | 
					 | 
				
			||||||
            task_status=task_status,
 | 
					            task_status=task_status,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -355,7 +354,6 @@ async def mp_new_proc(
 | 
				
			||||||
    parent_addr: Tuple[str, int],
 | 
					    parent_addr: Tuple[str, int],
 | 
				
			||||||
    _runtime_vars: Dict[str, Any],  # serialized and sent to _child
 | 
					    _runtime_vars: Dict[str, Any],  # serialized and sent to _child
 | 
				
			||||||
    *,
 | 
					    *,
 | 
				
			||||||
    infect_asyncio: bool = False,
 | 
					 | 
				
			||||||
    task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
 | 
					    task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -12,7 +12,7 @@ import trio
 | 
				
			||||||
from async_generator import asynccontextmanager
 | 
					from async_generator import asynccontextmanager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from . import _debug
 | 
					from . import _debug
 | 
				
			||||||
from ._state import current_actor, is_main_process, is_root_process
 | 
					from ._state import current_actor, is_main_process
 | 
				
			||||||
from .log import get_logger, get_loglevel
 | 
					from .log import get_logger, get_loglevel
 | 
				
			||||||
from ._actor import Actor
 | 
					from ._actor import Actor
 | 
				
			||||||
from ._portal import Portal
 | 
					from ._portal import Portal
 | 
				
			||||||
| 
						 | 
					@ -263,26 +263,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
				
			||||||
                        "to complete"
 | 
					                        "to complete"
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                except BaseException as err:
 | 
					                except BaseException as err:
 | 
				
			||||||
 | 
					 | 
				
			||||||
                    if is_root_process() and (
 | 
					 | 
				
			||||||
                        type(err) in {
 | 
					 | 
				
			||||||
                            Exception, trio.MultiError, trio.Cancelled
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    ):
 | 
					 | 
				
			||||||
                        # if we error in the root but the debugger is
 | 
					 | 
				
			||||||
                        # engaged we don't want to prematurely kill (and
 | 
					 | 
				
			||||||
                        # thus clobber access to) the local tty streams.
 | 
					 | 
				
			||||||
                        # instead try to wait for pdb to be released before
 | 
					 | 
				
			||||||
                        # tearing down.
 | 
					 | 
				
			||||||
                        debug_complete = _debug._pdb_complete
 | 
					 | 
				
			||||||
                        if debug_complete and not debug_complete.is_set():
 | 
					 | 
				
			||||||
                            log.warning(
 | 
					 | 
				
			||||||
                                "Root has errored but pdb is active..waiting "
 | 
					 | 
				
			||||||
                                "on debug lock")
 | 
					 | 
				
			||||||
                            await _debug._pdb_complete.wait()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # raise
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # if the caller's scope errored then we activate our
 | 
					                    # if the caller's scope errored then we activate our
 | 
				
			||||||
                    # one-cancels-all supervisor strategy (don't
 | 
					                    # one-cancels-all supervisor strategy (don't
 | 
				
			||||||
                    # worry more are coming).
 | 
					                    # worry more are coming).
 | 
				
			||||||
| 
						 | 
					@ -397,12 +377,27 @@ async def open_nursery(
 | 
				
			||||||
            async with open_root_actor(**kwargs) as actor:
 | 
					            async with open_root_actor(**kwargs) as actor:
 | 
				
			||||||
                assert actor is current_actor()
 | 
					                assert actor is current_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # try:
 | 
					                try:
 | 
				
			||||||
                    async with _open_and_supervise_one_cancels_all_nursery(
 | 
					                    async with _open_and_supervise_one_cancels_all_nursery(
 | 
				
			||||||
                        actor
 | 
					                        actor
 | 
				
			||||||
                    ) as anursery:
 | 
					                    ) as anursery:
 | 
				
			||||||
                        yield anursery
 | 
					                        yield anursery
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                except (Exception, trio.MultiError, trio.Cancelled):
 | 
				
			||||||
 | 
					                    # if we error in the root but the debugger is
 | 
				
			||||||
 | 
					                    # engaged we don't want to prematurely kill (and
 | 
				
			||||||
 | 
					                    # thus clobber access to) the local tty streams.
 | 
				
			||||||
 | 
					                    # instead try to wait for pdb to be released before
 | 
				
			||||||
 | 
					                    # tearing down.
 | 
				
			||||||
 | 
					                    if not _debug._pdb_complete.is_set():
 | 
				
			||||||
 | 
					                        log.warning(
 | 
				
			||||||
 | 
					                            "Root has errored but pdb is active..waiting "
 | 
				
			||||||
 | 
					                            "on debug lock")
 | 
				
			||||||
 | 
					                        with trio.CancelScope(shield=True):
 | 
				
			||||||
 | 
					                            await _debug._pdb_complete.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        else:  # sub-nursery case
 | 
					        else:  # sub-nursery case
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async with _open_and_supervise_one_cancels_all_nursery(
 | 
					            async with _open_and_supervise_one_cancels_all_nursery(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -101,16 +101,14 @@ def _run_asyncio_task(
 | 
				
			||||||
        """Cancel the calling ``trio`` task on error.
 | 
					        """Cancel the calling ``trio`` task on error.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        nonlocal aio_err
 | 
					        nonlocal aio_err
 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
        aio_err = task.exception()
 | 
					        aio_err = task.exception()
 | 
				
			||||||
        except asyncio.CancelledError as cerr:
 | 
					 | 
				
			||||||
            aio_err = cerr
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if aio_err:
 | 
					        if aio_err:
 | 
				
			||||||
            log.exception(f"asyncio task errorred:\n{aio_err}")
 | 
					            log.exception(f"asyncio task errorred:\n{aio_err}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # cancel_scope.cancel()
 | 
					        # cancel_scope.cancel()
 | 
				
			||||||
        from_aio._err = aio_err
 | 
					        from_aio._err = aio_err
 | 
				
			||||||
 | 
					        to_trio.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task.add_done_callback(cancel_trio)
 | 
					    task.add_done_callback(cancel_trio)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -235,26 +233,18 @@ async def run_task(
 | 
				
			||||||
        #         raise aio_err
 | 
					        #         raise aio_err
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Do we need this?
 | 
					    # Do we need this?
 | 
				
			||||||
    except Exception as err:
 | 
					    except BaseException as err:
 | 
				
			||||||
        # await tractor.breakpoint()
 | 
					        # await tractor.breakpoint()
 | 
				
			||||||
        aio_err = from_aio._err
 | 
					        aio_err = from_aio._err
 | 
				
			||||||
 | 
					 | 
				
			||||||
        # try:
 | 
					 | 
				
			||||||
        if aio_err is not None:
 | 
					        if aio_err is not None:
 | 
				
			||||||
            # always raise from any captured asyncio error
 | 
					            # always raise from any captured asyncio error
 | 
				
			||||||
            raise err from aio_err
 | 
					            raise err from aio_err
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            raise
 | 
					            raise
 | 
				
			||||||
        # finally:
 | 
					 | 
				
			||||||
        #     if not task.done():
 | 
					 | 
				
			||||||
        #         task.cancel()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    except trio.Cancelled:
 | 
					    finally:
 | 
				
			||||||
        if not task.done():
 | 
					 | 
				
			||||||
        task.cancel()
 | 
					        task.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        raise
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
# async def stream_from_task
 | 
					# async def stream_from_task
 | 
				
			||||||
#     pass
 | 
					#     pass
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue