diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 040d8b6..80b66f1 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -261,13 +261,26 @@ async def test_some_cancels_all(num_actors_and_errs, start_method): pytest.fail("Should have gotten a remote assertion error?") -async def spawn_and_error(num) -> None: +async def spawn_and_error(breadth, depth) -> None: name = tractor.current_actor().name async with tractor.open_nursery() as nursery: - for i in range(num): - await nursery.run_in_actor( - f'{name}_errorer_{i}', assert_err - ) + for i in range(breadth): + if depth > 0: + args = ( + f'spawner_{i}_depth_{depth}', + spawn_and_error, + ) + kwargs = { + 'breadth': breadth, + 'depth': depth - 1, + } + else: + args = ( + f'{name}_errorer_{i}', + assert_err, + ) + kwargs = {} + await nursery.run_in_actor(*args, **kwargs) @tractor_test @@ -276,25 +289,34 @@ async def test_nested_multierrors(loglevel, start_method): This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. """ - if platform.system() == 'Windows': - # Windows CI seems to be partifcularly fragile on Python 3.8.. - num_subactors = 2 - else: - # XXX: any more then this and the forkserver will - # start bailing hard...gotta look into it - num_subactors = 4 + # XXX: forkserver can't seem to handle any more then 2 depth + # process trees for whatever reason. + # Any more process levels then this and we start getting pretty slow anyway + depth = 3 + subactor_breadth = 2 - try: - async with tractor.open_nursery() as nursery: + with trio.fail_after(120): + try: + async with tractor.open_nursery() as nursery: + for i in range(subactor_breadth): + await nursery.run_in_actor( + f'spawner_{i}', + spawn_and_error, + breadth=subactor_breadth, + depth=depth, + ) + except trio.MultiError as err: + assert len(err.exceptions) == subactor_breadth + for subexc in err.exceptions: + assert isinstance(subexc, tractor.RemoteActorError) + if depth > 1 and subactor_breadth > 1: - for i in range(num_subactors): - await nursery.run_in_actor( - f'spawner_{i}', - spawn_and_error, - num=num_subactors, - ) - except trio.MultiError as err: - assert len(err.exceptions) == num_subactors - for subexc in err.exceptions: - assert isinstance(subexc, tractor.RemoteActorError) - assert subexc.type is trio.MultiError + # XXX not sure what's up with this.. + if platform.system() == 'Windows': + assert (subexc.type is trio.MultiError) or ( + subexc.type is tractor.RemoteActorError) + else: + assert subexc.type is trio.MultiError + else: + assert (subexc.type is tractor.RemoteActorError) or ( + subexc.type is trio.Cancelled) diff --git a/tractor/_actor.py b/tractor/_actor.py index c75647e..5b5e7ff 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -144,7 +144,7 @@ async def _invoke( if not actor._rpc_tasks: log.info(f"All RPC tasks have completed") - actor._no_more_rpc_tasks.set() + actor._ongoing_rpc_tasks.set() class Actor: @@ -183,9 +183,8 @@ class Actor: self._peer_connected: dict = {} self._no_more_peers = trio.Event() self._no_more_peers.set() - - self._no_more_rpc_tasks = trio.Event() - self._no_more_rpc_tasks.set() + self._ongoing_rpc_tasks = trio.Event() + self._ongoing_rpc_tasks.set() # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ Tuple[Channel, str], @@ -234,7 +233,7 @@ class Actor: ) -> None: """Entry point for new inbound connections to the channel server. """ - self._no_more_peers.clear() + self._no_more_peers = trio.Event() chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -448,7 +447,7 @@ class Actor: f"{cs}") else: # mark that we have ongoing rpc tasks - self._no_more_rpc_tasks.clear() + self._ongoing_rpc_tasks = trio.Event() log.info(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested @@ -709,10 +708,9 @@ class Actor: for (chan, cid) in tasks.copy(): # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) - # if tasks: log.info( f"Waiting for remaining rpc tasks to complete {tasks}") - await self._no_more_rpc_tasks.wait() + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby