diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index ce74fdc..e121baf 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -197,7 +197,7 @@ async def test_cancel_infinite_streamer(start_method): ], ) @tractor_test -async def test_some_cancels_all(num_actors_and_errs, start_method): +async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): """Verify a subset of failed subactors causes all others in the nursery to be cancelled just like the strategy in trio. diff --git a/tractor/_actor.py b/tractor/_actor.py index 867403f..df8d0c8 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -259,7 +259,7 @@ class Actor: code (if it exists). """ try: - if self._spawn_method == 'trio_run_in_process': + if self._spawn_method == 'trio': parent_data = self._parent_main_data if 'init_main_from_name' in parent_data: _mp_fixup_main._fixup_main_from_name( diff --git a/tractor/_entry.py b/tractor/_entry.py index fc10e88..a6ff506 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -62,7 +62,19 @@ async def _trio_main( accept_addr: Tuple[str, int], parent_addr: Tuple[str, int] = None ) -> None: + """Entry point for a `trio_run_in_process` subactor. + + Here we don't need to call `trio.run()` since trip does that as + part of its subprocess startup sequence. + """ + if actor.loglevel is not None: + log.info( + f"Setting loglevel for {actor.uid} to {actor.loglevel}") + get_console_log(actor.loglevel) + + log.info(f"Started new trio process for {actor.uid}") _state._current_actor = actor await actor._async_main(accept_addr, parent_addr=parent_addr) + log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4ae2ce4..eaa7270 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -168,11 +168,13 @@ async def run_in_process(async_fn, *args, **kwargs): stdin=subprocess.PIPE ) + # send over func to call await p.stdin.send_all(encoded_job) yield p - #return cloudpickle.loads(p.stdout) + # wait for termination + await p.wait() async def new_proc(