Move to `trio.CancelScope`
parent
02e0c0e1a4
commit
78ddd33e3a
|
@ -70,7 +70,7 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def test_register_duplicate_name(daemon, arb_addr):
|
def test_register_duplicate_name(daemon, arb_addr):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
assert not tractor.current_actor().is_arbiter
|
assert not tractor.current_actor().is_arbiter
|
||||||
|
@ -85,4 +85,4 @@ async def test_register_duplicate_name(daemon, arb_addr):
|
||||||
|
|
||||||
# run it manually since we want to start **after**
|
# run it manually since we want to start **after**
|
||||||
# the other "daemon" program
|
# the other "daemon" program
|
||||||
tractor.run(main, arb_addr=arbiter_addr)
|
tractor.run(main, arbiter_addr=arb_addr)
|
||||||
|
|
|
@ -14,7 +14,7 @@ async def stream_seq(sequence):
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
# block indefinitely waiting to be cancelled by ``aclose()`` call
|
# block indefinitely waiting to be cancelled by ``aclose()`` call
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
await trio.sleep(float('inf'))
|
await trio.sleep(float('inf'))
|
||||||
assert 0
|
assert 0
|
||||||
assert cs.cancelled_caught
|
assert cs.cancelled_caught
|
||||||
|
|
|
@ -73,7 +73,7 @@ async def _invoke(
|
||||||
not is_async_gen_partial
|
not is_async_gen_partial
|
||||||
):
|
):
|
||||||
await chan.send({'functype': 'function', 'cid': cid})
|
await chan.send({'functype': 'function', 'cid': cid})
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': func(**kwargs), 'cid': cid})
|
await chan.send({'return': func(**kwargs), 'cid': cid})
|
||||||
else:
|
else:
|
||||||
|
@ -88,7 +88,7 @@ async def _invoke(
|
||||||
# have to properly handle the closing (aclosing)
|
# have to properly handle the closing (aclosing)
|
||||||
# of the async gen in order to be sure the cancel
|
# of the async gen in order to be sure the cancel
|
||||||
# is propagated!
|
# is propagated!
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
async with aclosing(coro) as agen:
|
async with aclosing(coro) as agen:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
|
@ -113,7 +113,7 @@ async def _invoke(
|
||||||
# back values like an async-generator would but must
|
# back values like an async-generator would but must
|
||||||
# manualy construct the response dict-packet-responses as
|
# manualy construct the response dict-packet-responses as
|
||||||
# above
|
# above
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await coro
|
await coro
|
||||||
if not cs.cancelled_caught:
|
if not cs.cancelled_caught:
|
||||||
|
@ -122,7 +122,7 @@ async def _invoke(
|
||||||
await chan.send({'stop': True, 'cid': cid})
|
await chan.send({'stop': True, 'cid': cid})
|
||||||
else:
|
else:
|
||||||
await chan.send({'functype': 'asyncfunction', 'cid': cid})
|
await chan.send({'functype': 'asyncfunction', 'cid': cid})
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -377,7 +377,7 @@ class Actor:
|
||||||
# loop running despite the current task having been
|
# loop running despite the current task having been
|
||||||
# cancelled (eg. `open_portal()` may call this method from
|
# cancelled (eg. `open_portal()` may call this method from
|
||||||
# a locally spawned task)
|
# a locally spawned task)
|
||||||
with trio.open_cancel_scope(shield=shield) as cs:
|
with trio.CancelScope(shield=shield) as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
|
@ -50,7 +50,7 @@ async def _do_handshake(
|
||||||
|
|
||||||
class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
||||||
special behaviour for signalling stream termination on across an
|
special behaviour for signalling stream termination across an
|
||||||
inter-actor ``Channel``. This is the type returned to a local task
|
inter-actor ``Channel``. This is the type returned to a local task
|
||||||
which invoked a remote streaming function using `Portal.run()`.
|
which invoked a remote streaming function using `Portal.run()`.
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
"May have failed to cancel remote task "
|
"May have failed to cancel remote task "
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
|
|
||||||
with trio.open_cancel_scope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await self._rx_chan.aclose()
|
await self._rx_chan.aclose()
|
||||||
|
|
||||||
def clone(self):
|
def clone(self):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Per process state
|
Per process state
|
||||||
"""
|
"""
|
||||||
|
import multiprocessing as mp
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,3 +14,9 @@ def current_actor() -> 'Actor': # type: ignore
|
||||||
if not _current_actor:
|
if not _current_actor:
|
||||||
raise RuntimeError("No actor instance has been defined yet?")
|
raise RuntimeError("No actor instance has been defined yet?")
|
||||||
return _current_actor
|
return _current_actor
|
||||||
|
|
||||||
|
|
||||||
|
def is_main_process():
|
||||||
|
"""Bool determining if this actor is running in the top-most process.
|
||||||
|
"""
|
||||||
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
|
@ -185,7 +185,7 @@ class ActorNursery:
|
||||||
|
|
||||||
Should only be called for actors spawned with `run_in_actor()`.
|
Should only be called for actors spawned with `run_in_actor()`.
|
||||||
"""
|
"""
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
# if this call errors we store the exception for later
|
# if this call errors we store the exception for later
|
||||||
# in ``errors`` which will be reraised inside
|
# in ``errors`` which will be reraised inside
|
||||||
|
@ -316,7 +316,7 @@ class ActorNursery:
|
||||||
# a cancel signal shows up slightly after in which case
|
# a cancel signal shows up slightly after in which case
|
||||||
# the `else:` block here might not complete?
|
# the `else:` block here might not complete?
|
||||||
# For now, shield both.
|
# For now, shield both.
|
||||||
with trio.open_cancel_scope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
if etype in (trio.Cancelled, KeyboardInterrupt):
|
if etype in (trio.Cancelled, KeyboardInterrupt):
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Nursery for {current_actor().uid} was "
|
f"Nursery for {current_actor().uid} was "
|
||||||
|
@ -355,9 +355,3 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
# TODO: figure out supervisors from erlang
|
# TODO: figure out supervisors from erlang
|
||||||
async with ActorNursery(actor) as nursery:
|
async with ActorNursery(actor) as nursery:
|
||||||
yield nursery
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
def is_main_process():
|
|
||||||
"""Bool determining if this actor is running in the top-most process.
|
|
||||||
"""
|
|
||||||
return mp.current_process().name == 'MainProcess'
|
|
||||||
|
|
Loading…
Reference in New Issue