forked from goodboy/tractor
Cancel remote streaming tasks on a local cancel
Use the new `Actor.cancel_task()` api to remotely cancel streaming tasks spawned by a portal. This guarantees that if an actor is cancelled all its (remote) portal spawned tasks will be as well. On portal teardown only cancel all async generator calls (though we should cancel all RPC requests in general eventually) and don't close the channel since it may have been passed in from some other context that wishes to keep it connected. In `open_portal()` run the message loop shielded so that if the local task is cancelled, messaging will continue until the internal scope is cancelled at end of block.contexts
parent
03e00886da
commit
97f709cc14
|
@ -5,9 +5,11 @@ import importlib
|
||||||
import inspect
|
import inspect
|
||||||
import typing
|
import typing
|
||||||
from typing import Tuple, Any, Dict, Optional, Set
|
from typing import Tuple, Any, Dict, Optional, Set
|
||||||
|
from functools import partial
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager, aclosing
|
||||||
|
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
@ -53,7 +55,7 @@ class Portal:
|
||||||
underlying ``tractor.Channel`` as though the remote (async)
|
underlying ``tractor.Channel`` as though the remote (async)
|
||||||
function / generator was invoked locally.
|
function / generator was invoked locally.
|
||||||
|
|
||||||
Think of this like an native async IPC API.
|
Think of this like a native async IPC API.
|
||||||
"""
|
"""
|
||||||
def __init__(self, channel: Channel) -> None:
|
def __init__(self, channel: Channel) -> None:
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
|
@ -124,7 +126,7 @@ class Portal:
|
||||||
# to make async-generators the fundamental IPC API over channels!
|
# to make async-generators the fundamental IPC API over channels!
|
||||||
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
||||||
|
|
||||||
if resptype == 'yield':
|
if resptype == 'yield': # stream response
|
||||||
|
|
||||||
async def yield_from_q():
|
async def yield_from_q():
|
||||||
try:
|
try:
|
||||||
|
@ -140,12 +142,21 @@ class Portal:
|
||||||
"Received internal error at portal?")
|
"Received internal error at portal?")
|
||||||
raise unpack_error(msg, self.channel)
|
raise unpack_error(msg, self.channel)
|
||||||
|
|
||||||
except GeneratorExit:
|
except (GeneratorExit, trio.Cancelled):
|
||||||
# for now this msg cancels an ongoing remote task
|
log.warning(
|
||||||
await self.channel.send({'cancel': True, 'cid': cid})
|
|
||||||
log.warn(
|
|
||||||
f"Cancelling async gen call {cid} to "
|
f"Cancelling async gen call {cid} to "
|
||||||
f"{self.channel.uid}")
|
f"{self.channel.uid}")
|
||||||
|
with trio.open_cancel_scope() as cleanup_scope:
|
||||||
|
cleanup_scope.shield = True
|
||||||
|
# TODO: yeah.. it'd be nice if this was just an
|
||||||
|
# async func on the far end. Gotta figure out a
|
||||||
|
# better way then implicitly feeding the ctx
|
||||||
|
# to declaring functions; likely a decorator
|
||||||
|
# sytsem.
|
||||||
|
agen = await self.run('self', 'cancel_task', cid=cid)
|
||||||
|
async with aclosing(agen) as agen:
|
||||||
|
async for _ in agen:
|
||||||
|
pass
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: use AsyncExitStack to aclose() all agens
|
# TODO: use AsyncExitStack to aclose() all agens
|
||||||
|
@ -154,7 +165,7 @@ class Portal:
|
||||||
self._agens.add(agen)
|
self._agens.add(agen)
|
||||||
return agen
|
return agen
|
||||||
|
|
||||||
elif resptype == 'return':
|
elif resptype == 'return': # single response
|
||||||
msg = await q.get()
|
msg = await q.get()
|
||||||
try:
|
try:
|
||||||
return msg['return']
|
return msg['return']
|
||||||
|
@ -176,7 +187,7 @@ class Portal:
|
||||||
|
|
||||||
# not expecting a "main" result
|
# not expecting a "main" result
|
||||||
if self._expect_result is None:
|
if self._expect_result is None:
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Portal for {self.channel.uid} not expecting a final"
|
f"Portal for {self.channel.uid} not expecting a final"
|
||||||
" result?\nresult() should only be called if subactor"
|
" result?\nresult() should only be called if subactor"
|
||||||
" was spawned with `ActorNursery.run_in_actor()`")
|
" was spawned with `ActorNursery.run_in_actor()`")
|
||||||
|
@ -198,22 +209,34 @@ class Portal:
|
||||||
|
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
|
async def _cancel_streams(self):
|
||||||
|
# terminate all locally running async generator
|
||||||
|
# IPC calls
|
||||||
|
if self._agens:
|
||||||
|
log.warning(
|
||||||
|
f"Cancelling all streams with {self.channel.uid}")
|
||||||
|
for agen in self._agens:
|
||||||
|
await agen.aclose()
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
# trigger remote msg loop `break`
|
await self._cancel_streams()
|
||||||
chan = self.channel
|
|
||||||
log.debug(f"Closing portal for {chan} to {chan.uid}")
|
|
||||||
await self.channel.send(None)
|
|
||||||
|
|
||||||
async def cancel_actor(self) -> bool:
|
async def cancel_actor(self) -> bool:
|
||||||
"""Cancel the actor on the other end of this portal.
|
"""Cancel the actor on the other end of this portal.
|
||||||
"""
|
"""
|
||||||
|
if not self.channel.connected():
|
||||||
|
log.warning("This portal is already closed can't cancel")
|
||||||
|
return False
|
||||||
|
|
||||||
|
await self._cancel_streams()
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Sending cancel request to {self.channel.uid} on "
|
f"Sending actor cancel request to {self.channel.uid} on "
|
||||||
f"{self.channel}")
|
f"{self.channel}")
|
||||||
try:
|
try:
|
||||||
with trio.move_on_after(0.1) as cancel_scope:
|
|
||||||
cancel_scope.shield = True
|
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
|
with trio.move_on_after(0.5) as cancel_scope:
|
||||||
|
cancel_scope.shield = True
|
||||||
await self.run('self', 'cancel')
|
await self.run('self', 'cancel')
|
||||||
return True
|
return True
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
|
@ -225,17 +248,14 @@ class Portal:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
class LocalPortal:
|
class LocalPortal:
|
||||||
"""A 'portal' to a local ``Actor``.
|
"""A 'portal' to a local ``Actor``.
|
||||||
|
|
||||||
A compatibility shim for normal portals but for invoking functions
|
A compatibility shim for normal portals but for invoking functions
|
||||||
using an in process actor instance.
|
using an in process actor instance.
|
||||||
"""
|
"""
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
actor: 'Actor' # type: ignore
|
actor: 'Actor' # type: ignore
|
||||||
) -> None:
|
|
||||||
self.actor = actor
|
|
||||||
|
|
||||||
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
|
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
|
||||||
"""Run a requested function locally and return it's result.
|
"""Run a requested function locally and return it's result.
|
||||||
|
@ -270,20 +290,26 @@ async def open_portal(
|
||||||
if channel.uid is None:
|
if channel.uid is None:
|
||||||
await _do_handshake(actor, channel)
|
await _do_handshake(actor, channel)
|
||||||
|
|
||||||
nursery.start_soon(actor._process_messages, channel)
|
msg_loop_cs = await nursery.start(
|
||||||
|
partial(
|
||||||
|
actor._process_messages,
|
||||||
|
channel,
|
||||||
|
# if the local task is cancelled we want to keep
|
||||||
|
# the msg loop running until our block ends
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
portal = Portal(channel)
|
portal = Portal(channel)
|
||||||
try:
|
try:
|
||||||
yield portal
|
yield portal
|
||||||
finally:
|
finally:
|
||||||
# tear down all async generators
|
|
||||||
for agen in portal._agens:
|
|
||||||
await agen.aclose()
|
|
||||||
|
|
||||||
# cancel remote channel-msg loop
|
|
||||||
if channel.connected():
|
|
||||||
await portal.close()
|
await portal.close()
|
||||||
|
|
||||||
# cancel background msg loop task
|
|
||||||
nursery.cancel_scope.cancel()
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
|
# cancel remote channel-msg loop
|
||||||
|
await channel.send(None)
|
||||||
await channel.aclose()
|
await channel.aclose()
|
||||||
|
|
||||||
|
# cancel background msg loop task
|
||||||
|
msg_loop_cs.cancel()
|
||||||
|
nursery.cancel_scope.cancel()
|
||||||
|
|
Loading…
Reference in New Issue