forked from goodboy/tractor
Merge pull request #1 from goodboy/drop-trip-update-trio
Drop trip update triodrop-trip-update-trio
commit
2b09818ed0
|
@ -197,7 +197,7 @@ async def test_cancel_infinite_streamer(start_method):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_some_cancels_all(num_actors_and_errs, start_method):
|
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
||||||
"""Verify a subset of failed subactors causes all others in
|
"""Verify a subset of failed subactors causes all others in
|
||||||
the nursery to be cancelled just like the strategy in trio.
|
the nursery to be cancelled just like the strategy in trio.
|
||||||
|
|
||||||
|
|
|
@ -259,7 +259,7 @@ class Actor:
|
||||||
code (if it exists).
|
code (if it exists).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if self._spawn_method == 'trio_run_in_process':
|
if self._spawn_method == 'trio':
|
||||||
parent_data = self._parent_main_data
|
parent_data = self._parent_main_data
|
||||||
if 'init_main_from_name' in parent_data:
|
if 'init_main_from_name' in parent_data:
|
||||||
_mp_fixup_main._fixup_main_from_name(
|
_mp_fixup_main._fixup_main_from_name(
|
||||||
|
|
|
@ -62,7 +62,19 @@ async def _trio_main(
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int] = None
|
parent_addr: Tuple[str, int] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
|
|
||||||
|
Here we don't need to call `trio.run()` since trip does that as
|
||||||
|
part of its subprocess startup sequence.
|
||||||
|
"""
|
||||||
|
if actor.loglevel is not None:
|
||||||
|
log.info(
|
||||||
|
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||||
|
get_console_log(actor.loglevel)
|
||||||
|
|
||||||
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
_state._current_actor = actor
|
_state._current_actor = actor
|
||||||
|
|
||||||
await actor._async_main(accept_addr, parent_addr=parent_addr)
|
await actor._async_main(accept_addr, parent_addr=parent_addr)
|
||||||
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
|
@ -168,11 +168,13 @@ async def run_in_process(async_fn, *args, **kwargs):
|
||||||
stdin=subprocess.PIPE
|
stdin=subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# send over func to call
|
||||||
await p.stdin.send_all(encoded_job)
|
await p.stdin.send_all(encoded_job)
|
||||||
|
|
||||||
yield p
|
yield p
|
||||||
|
|
||||||
#return cloudpickle.loads(p.stdout)
|
# wait for termination
|
||||||
|
await p.wait()
|
||||||
|
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
|
|
Loading…
Reference in New Issue