Fix actor nursery __exit__ handling
When an error is raised inside a nursery block (in the local actor) cancel all spawned children and ensure the error is properly unsuppressed. Also, - change `invoke_cmd` to `send_cmd` and expect the caller to use `result_from_q` (avoids implicit blocking for responses that might never arrive) - `nursery.start()` the channel server block such that we wait for the underlying listener to spawn before making outbound connections - cancel the channel server when an actor's main task completes (given that `outlive_main == False`) - raise subactor errors directly in the local actors's msg loop - enforce that `treat_as_gen` async functions respond with a caller id (`cid`) in each yield packetkivy_mainline_and_py3.8
parent
84cd29644e
commit
37eb8a8552
|
@ -24,7 +24,7 @@ log = get_logger('tractor')
|
||||||
_current_actor = None
|
_current_actor = None
|
||||||
|
|
||||||
# for debugging
|
# for debugging
|
||||||
log = get_console_log('trace')
|
log = get_console_log('info')
|
||||||
|
|
||||||
|
|
||||||
class ActorFailure(Exception):
|
class ActorFailure(Exception):
|
||||||
|
@ -219,14 +219,16 @@ class Actor:
|
||||||
log.debug(f"Registering for result from call id {cid}")
|
log.debug(f"Registering for result from call id {cid}")
|
||||||
return cids2qs.setdefault(cid, trio.Queue(1000))
|
return cids2qs.setdefault(cid, trio.Queue(1000))
|
||||||
|
|
||||||
async def invoke_cmd(self, chan, ns, func, kwargs):
|
async def send_cmd(self, chan, ns, func, kwargs):
|
||||||
"""Invoke a remote command by sending a `cmd` message and waiting
|
"""Send a ``'cmd'`` message to a remote actor and return a
|
||||||
on the msg processing loop for its response(s).
|
caller id and a ``trio.Queue`` that can be used to wait for
|
||||||
|
responses delivered by the local message processing loop.
|
||||||
"""
|
"""
|
||||||
cid = str(uuid.uuid1())
|
cid = str(uuid.uuid1())
|
||||||
q = self.get_waitq(chan.uid, cid)
|
q = self.get_waitq(chan.uid, cid)
|
||||||
|
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||||
return await result_from_q(q)
|
return cid, q
|
||||||
|
|
||||||
async def _process_messages(self, chan, treat_as_gen=False):
|
async def _process_messages(self, chan, treat_as_gen=False):
|
||||||
"""Process messages async-RPC style.
|
"""Process messages async-RPC style.
|
||||||
|
@ -240,16 +242,15 @@ class Actor:
|
||||||
log.debug(f"Terminating msg loop for {chan}")
|
log.debug(f"Terminating msg loop for {chan}")
|
||||||
break
|
break
|
||||||
log.debug(f"Received msg {msg}")
|
log.debug(f"Received msg {msg}")
|
||||||
# try:
|
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
if cid: # deliver response to local caller/waiter
|
if cid: # deliver response to local caller/waiter
|
||||||
self._push_result(chan.uid, cid, msg)
|
self._push_result(chan.uid, cid, msg)
|
||||||
|
if 'error' in msg:
|
||||||
|
# TODO: need something better then this slop
|
||||||
|
raise RemoteActorError(msg['error'])
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||||
# except Exception:
|
|
||||||
# await chan.send({'error': traceback.format_exc()})
|
|
||||||
# break
|
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Processing request from {actorid}\n"
|
f"Processing request from {actorid}\n"
|
||||||
|
@ -264,7 +265,10 @@ class Actor:
|
||||||
sig = inspect.signature(func)
|
sig = inspect.signature(func)
|
||||||
treat_as_gen = False
|
treat_as_gen = False
|
||||||
if 'chan' in sig.parameters:
|
if 'chan' in sig.parameters:
|
||||||
|
assert 'cid' in sig.parameters, \
|
||||||
|
f"{func} must accept a `cid` (caller id) kwarg"
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
|
kwargs['cid'] = cid
|
||||||
# TODO: eventually we want to be more stringent
|
# TODO: eventually we want to be more stringent
|
||||||
# about what is considered a far-end async-generator.
|
# about what is considered a far-end async-generator.
|
||||||
# Right now both actual async gens and any async
|
# Right now both actual async gens and any async
|
||||||
|
@ -302,7 +306,7 @@ class Actor:
|
||||||
|
|
||||||
# Startup up channel server
|
# Startup up channel server
|
||||||
host, port = accept_addr
|
host, port = accept_addr
|
||||||
nursery.start_soon(partial(
|
await nursery.start(partial(
|
||||||
self._serve_forever, accept_host=host, accept_port=port)
|
self._serve_forever, accept_host=host, accept_port=port)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -323,8 +327,6 @@ class Actor:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"already have channel for {uid} registered?"
|
f"already have channel for {uid} registered?"
|
||||||
)
|
)
|
||||||
# else:
|
|
||||||
# self._peers[uid] = chan
|
|
||||||
|
|
||||||
# handle new connection back to parent optionally
|
# handle new connection back to parent optionally
|
||||||
# begin responding to RPC
|
# begin responding to RPC
|
||||||
|
@ -354,7 +356,7 @@ class Actor:
|
||||||
# tear down channel server
|
# tear down channel server
|
||||||
if not self._outlive_main:
|
if not self._outlive_main:
|
||||||
log.debug(f"Shutting down channel server")
|
log.debug(f"Shutting down channel server")
|
||||||
self._server_nursery.cancel_scope.cancel()
|
self.cancel_server()
|
||||||
|
|
||||||
# blocks here as expected if no nursery was provided until
|
# blocks here as expected if no nursery was provided until
|
||||||
# the channel server is killed (i.e. this actor is
|
# the channel server is killed (i.e. this actor is
|
||||||
|
@ -397,8 +399,6 @@ class Actor:
|
||||||
)
|
)
|
||||||
self._listeners.extend(listeners)
|
self._listeners.extend(listeners)
|
||||||
log.debug(f"Spawned {listeners}")
|
log.debug(f"Spawned {listeners}")
|
||||||
|
|
||||||
# when launched in-process, trigger awaiter's completion
|
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
|
@ -407,6 +407,12 @@ class Actor:
|
||||||
"""
|
"""
|
||||||
self._root_nursery.cancel_scope.cancel()
|
self._root_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
def cancel_server(self):
|
||||||
|
"""Cancel the internal channel server nursery thereby
|
||||||
|
preventing any new inbound connections from being established.
|
||||||
|
"""
|
||||||
|
self._server_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addr(self):
|
def accept_addr(self):
|
||||||
"""Primary address to which the channel server is bound.
|
"""Primary address to which the channel server is bound.
|
||||||
|
@ -417,6 +423,9 @@ class Actor:
|
||||||
def get_parent(self):
|
def get_parent(self):
|
||||||
return Portal(self._parent_chan)
|
return Portal(self._parent_chan)
|
||||||
|
|
||||||
|
def get_chan(self, actorid):
|
||||||
|
return self._peers[actorid]
|
||||||
|
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who knows all the other actors and always has
|
||||||
|
@ -480,12 +489,14 @@ class Portal:
|
||||||
# ship a function call request to the remote actor
|
# ship a function call request to the remote actor
|
||||||
actor = current_actor()
|
actor = current_actor()
|
||||||
|
|
||||||
resptype, first_msg, q = await actor.invoke_cmd(chan, ns, func, kwargs)
|
cid, q = await actor.send_cmd(chan, ns, func, kwargs)
|
||||||
|
# wait on first response msg
|
||||||
|
resptype, first_msg, q = await result_from_q(q)
|
||||||
|
|
||||||
if resptype == 'yield':
|
if resptype == 'yield':
|
||||||
|
|
||||||
async def yield_from_q():
|
async def yield_from_q():
|
||||||
yield first_msg
|
yield first_msg['yield']
|
||||||
for msg in q:
|
for msg in q:
|
||||||
try:
|
try:
|
||||||
yield msg['yield']
|
yield msg['yield']
|
||||||
|
@ -583,26 +594,27 @@ class ActorNursery:
|
||||||
nursery.start_soon(wait_for_proc, proc)
|
nursery.start_soon(wait_for_proc, proc)
|
||||||
|
|
||||||
async def cancel(self, hard_kill=False):
|
async def cancel(self, hard_kill=False):
|
||||||
|
log.debug(f"Cancelling nursery")
|
||||||
for subactor, proc, main_q in self._children.values():
|
for subactor, proc, main_q in self._children.values():
|
||||||
if proc is mp.current_process():
|
if proc is mp.current_process():
|
||||||
# XXX: does this even make sense?
|
# XXX: does this even make sense?
|
||||||
subactor.cancel()
|
await subactor.cancel()
|
||||||
else:
|
else:
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
|
log.warn(f"Hard killing subactors {self._children}")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
# send KeyBoardInterrupt (trio abort signal) to underlying
|
# send KeyBoardInterrupt (trio abort signal) to underlying
|
||||||
# sub-actors
|
# sub-actors
|
||||||
# os.kill(proc.pid, signal.SIGINT)
|
# os.kill(proc.pid, signal.SIGINT)
|
||||||
else:
|
else:
|
||||||
# invoke cancel cmd
|
# send cancel cmd - likely no response from subactor
|
||||||
actor = self._actor
|
actor = self._actor
|
||||||
resptype, first_msg, q = await actor.invoke_cmd(
|
cid, q = await actor.send_cmd(
|
||||||
actor._peers[subactor.uid], # channel
|
actor.get_chan(subactor.uid), # channel lookup
|
||||||
'self',
|
'self',
|
||||||
'cancel',
|
'cancel',
|
||||||
{},
|
{},
|
||||||
)
|
)
|
||||||
|
|
||||||
log.debug(f"Waiting on all subactors to complete")
|
log.debug(f"Waiting on all subactors to complete")
|
||||||
await self.wait()
|
await self.wait()
|
||||||
log.debug(f"All subactors for {self} have terminated")
|
log.debug(f"All subactors for {self} have terminated")
|
||||||
|
@ -611,16 +623,25 @@ class ActorNursery:
|
||||||
"""Wait on all subactor's main routines to complete.
|
"""Wait on all subactor's main routines to complete.
|
||||||
"""
|
"""
|
||||||
async def wait_for_actor(actor, proc, q):
|
async def wait_for_actor(actor, proc, q):
|
||||||
ret_type, msg, q = await result_from_q(q)
|
if proc.is_alive():
|
||||||
log.info(f"{actor.uid} main task completed with {msg}")
|
ret_type, msg, q = await result_from_q(q)
|
||||||
if not actor._outlive_main:
|
log.info(f"{actor.uid} main task completed with {msg}")
|
||||||
# trigger msg loop to break
|
if not actor._outlive_main:
|
||||||
log.info(f"Signalling msg loop exit for {actor.uid}")
|
# trigger msg loop to break
|
||||||
await self._actor._peers[actor.uid].send(None)
|
log.info(f"Signalling msg loop exit for {actor.uid}")
|
||||||
|
await self._actor.get_chan(actor.uid).send(None)
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
if etype is not None:
|
||||||
for subactor, proc, main_q in self._children.values():
|
log.warn(f"{current_actor().uid} errored with {etype}, "
|
||||||
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
|
"cancelling nursery")
|
||||||
|
await self.cancel()
|
||||||
|
else:
|
||||||
|
log.debug(f"Waiting on subactors to complete")
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
for subactor, proc, main_q in self._children.values():
|
||||||
|
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
|
||||||
|
|
||||||
|
log.debug(f"Nursery teardown complete")
|
||||||
|
|
||||||
|
|
||||||
def current_actor() -> Actor:
|
def current_actor() -> Actor:
|
||||||
|
@ -693,9 +714,10 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
|
||||||
|
|
||||||
yield LocalPortal(arbiter)
|
yield LocalPortal(arbiter)
|
||||||
|
|
||||||
# If spawned locally, the arbiter is cancelled when this context
|
# XXX: If spawned locally, the arbiter is cancelled when this
|
||||||
# is complete?
|
# context is complete given that there are no more active
|
||||||
# nursery.cancel_scope.cancel()
|
# peer channels connected to it.
|
||||||
|
arbiter.cancel_server()
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
@ -737,6 +759,8 @@ async def _main(async_fn, args, kwargs, name):
|
||||||
await serve_local_actor(
|
await serve_local_actor(
|
||||||
actor, accept_addr=kwargs.pop('accept_addr', (None, 0)))
|
actor, accept_addr=kwargs.pop('accept_addr', (None, 0)))
|
||||||
log.info("Completed async main")
|
log.info("Completed async main")
|
||||||
|
# TODO: when the local actor's main has completed we cancel?
|
||||||
|
# actor.cancel()
|
||||||
else:
|
else:
|
||||||
# block waiting for the arbiter main task to complete
|
# block waiting for the arbiter main task to complete
|
||||||
pass
|
pass
|
||||||
|
|
Loading…
Reference in New Issue