From 78ddd33e3a9b1c333cc166a5a21139b48c4c0a02 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Feb 2019 14:25:06 -0500 Subject: [PATCH] Move to `trio.CancelScope` --- tests/test_multi_program.py | 4 ++-- tests/test_streaming.py | 2 +- tractor/_actor.py | 10 +++++----- tractor/_portal.py | 4 ++-- tractor/_state.py | 7 +++++++ tractor/_trionics.py | 10 ++-------- 6 files changed, 19 insertions(+), 18 deletions(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 53413b7..299a698 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -70,7 +70,7 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): pass -async def test_register_duplicate_name(daemon, arb_addr): +def test_register_duplicate_name(daemon, arb_addr): async def main(): assert not tractor.current_actor().is_arbiter @@ -85,4 +85,4 @@ async def test_register_duplicate_name(daemon, arb_addr): # run it manually since we want to start **after** # the other "daemon" program - tractor.run(main, arb_addr=arbiter_addr) + tractor.run(main, arbiter_addr=arb_addr) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 55b4bee..2cddd41 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -14,7 +14,7 @@ async def stream_seq(sequence): await trio.sleep(0.1) # block indefinitely waiting to be cancelled by ``aclose()`` call - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: await trio.sleep(float('inf')) assert 0 assert cs.cancelled_caught diff --git a/tractor/_actor.py b/tractor/_actor.py index 02cdbc0..7a5e72e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -73,7 +73,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +88,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +113,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +122,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -377,7 +377,7 @@ class Actor: # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) - with trio.open_cancel_scope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as cs: task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel diff --git a/tractor/_portal.py b/tractor/_portal.py index 11ccdbf..160db19 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -50,7 +50,7 @@ async def _do_handshake( class StreamReceiveChannel(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination on across an + special behaviour for signalling stream termination across an inter-actor ``Channel``. This is the type returned to a local task which invoked a remote streaming function using `Portal.run()`. @@ -126,7 +126,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") - with trio.open_cancel_scope(shield=True): + with trio.CancelScope(shield=True): await self._rx_chan.aclose() def clone(self): diff --git a/tractor/_state.py b/tractor/_state.py index 704fae7..2606ff3 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,6 +1,7 @@ """ Per process state """ +import multiprocessing as mp from typing import Optional @@ -13,3 +14,9 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor + + +def is_main_process(): + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 894f3ec..dd18158 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -185,7 +185,7 @@ class ActorNursery: Should only be called for actors spawned with `run_in_actor()`. """ - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) # if this call errors we store the exception for later # in ``errors`` which will be reraised inside @@ -316,7 +316,7 @@ class ActorNursery: # a cancel signal shows up slightly after in which case # the `else:` block here might not complete? # For now, shield both. - with trio.open_cancel_scope(shield=True): + with trio.CancelScope(shield=True): if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " @@ -355,9 +355,3 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # TODO: figure out supervisors from erlang async with ActorNursery(actor) as nursery: yield nursery - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess'