Handle kb interrupt gracefully in sub-actors

Fail gracefully (by "aborting") the same way `trio` does. This avoids
ugly sub-proc tracebacks thrown at the console. Unset the local actor
when `tractor._main` completes. Cancel all tasks for a peer channel on
disconnect.
asyncgen_closing_fix
Tyler Goodlet 2018-06-27 11:34:22 -04:00
parent 0aa49dcbdf
commit fa6f8185b6
1 changed files with 45 additions and 36 deletions

View File

@ -52,23 +52,24 @@ async def _invoke(
"""Invoke local func and return results over provided channel.
"""
try:
is_async_func = False
is_async_partial = False
if isinstance(func, partial):
is_async_func = inspect.iscoroutinefunction(func.func)
is_async_partial = inspect.iscoroutinefunction(func.func)
if not inspect.iscoroutinefunction(func) and not is_async_func:
if not inspect.iscoroutinefunction(func) and not is_async_partial:
await chan.send({'return': func(**kwargs), 'cid': cid})
else:
coro = func(**kwargs)
if inspect.isasyncgen(coro):
# await chan.send('gen')
async for item in coro:
# TODO: can we send values back in here?
# How do we do it, spawn another task?
# to_send = await chan.recv()
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# await coro.send(to_send)
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
else:
if treat_as_gen:
@ -276,9 +277,13 @@ class Actor:
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
else: # channel disconnect
log.warn(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
def _fork_main(self, accept_addr, parent_addr=None, loglevel='debug'):
# after fork routine which invokes a fresh ``trio.run``
log.info(
f"Started new {ctx.current_process()} for actor {self.uid}")
@ -287,8 +292,11 @@ class Actor:
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}")
trio.run(
partial(self._async_main, accept_addr, parent_addr=parent_addr))
try:
trio.run(partial(
self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.debug(f"Actor {self.uid} terminated")
async def _async_main(self, accept_addr, parent_addr=None, nursery=None):
@ -415,8 +423,10 @@ class Actor:
def accept_addr(self):
"""Primary address to which the channel server is bound.
"""
return self._listeners[0].socket.getsockname() \
if self._listeners else None
try:
return self._listeners[0].socket.getsockname()
except OSError:
return
def get_parent(self):
return Portal(self._parent_chan)
@ -495,11 +505,14 @@ class Portal:
async def yield_from_q():
yield first_msg['yield']
async for msg in q:
try:
yield msg['yield']
except KeyError:
raise RemoteActorError(msg['error'])
try:
async for msg in q:
try:
yield msg['yield']
except KeyError:
raise RemoteActorError(msg['error'])
except GeneratorExit:
log.warn(f"Cancelling async gen call {cid} to {chan.uid}")
return yield_from_q()
@ -558,6 +571,7 @@ class ActorNursery:
outlive_main=outlive_main,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, parent_addr, loglevel),
@ -611,14 +625,10 @@ class ActorNursery:
actor = self._actor
chan = actor.get_chan(subactor.uid)
if chan:
cid, q = await actor.send_cmd(
chan, # channel lookup
'self',
'cancel',
{},
)
await actor.send_cmd(chan, 'self', 'cancel', {})
else:
log.warn(f"Channel for {subactor.uid} is already down?")
log.warn(
f"Channel for {subactor.uid} is already down?")
log.debug(f"Waiting on all subactors to complete")
await self.wait()
log.debug(f"All subactors for {self} have terminated")
@ -637,7 +647,7 @@ class ActorNursery:
if etype is not None:
log.warn(f"{current_actor().uid} errored with {etype}, "
"cancelling nursery")
"cancelling actor nursery")
await self.cancel()
else:
log.debug(f"Waiting on subactors to complete")
@ -687,7 +697,7 @@ class NoArbiterFound:
@asynccontextmanager
async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs):
async def get_arbiter(host='127.0.0.1', port=1617, main=None, **kwargs):
actor = current_actor()
if actor and not actor.is_arbiter:
try:
@ -698,16 +708,11 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs):
raise NoArbiterFound(err)
else:
if actor and actor.is_arbiter:
# we're already the arbiter (re-entrant call from the arbiter actor)
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
arbiter = Arbiter(
'arbiter',
# rpc_module_paths=[], # the arbiter doesn't allow module rpc
# statespace={}, # global proc state vars
main=main, # main coroutine to be invoked
**kwargs,
)
arbiter = Arbiter('arbiter', main=main, **kwargs)
# assign process-local actor
global _current_actor
_current_actor = arbiter
@ -754,10 +759,10 @@ async def _main(async_fn, args, kwargs, name):
# implemented yet FYI).
async with get_arbiter(
host=kwargs.pop('arbiter_host', '127.0.0.1'),
port=kwargs.pop('arbiter_port', 1616),
port=kwargs.pop('arbiter_port', 1617),
main=main,
**kwargs,
) as portal:
):
if not current_actor().is_arbiter:
# create a local actor and start it up its main routine
actor = Actor(
@ -775,6 +780,10 @@ async def _main(async_fn, args, kwargs, name):
# block waiting for the arbiter main task to complete
pass
# unset module state
global _current_actor
_current_actor = None
def run(async_fn, *args, arbiter_host=None, name='anonymous', **kwargs):
"""Run a trio-actor async function in process.