More fixes after unit testing

- Allow passing in a program-wide `loglevel`
- Add detailed debug logging particularly to do with channel msg processing
  and connection handling
- Don't daemonize subprocesses for now as it prevents use of
  sub-sub-actors (need to solve #6 first)
- Add a `Portal.close()` which just tells the remote actor to tear down
  the channel (for now)
- Add a message to signal the remote `StopAsyncIteration` from an async
  gen such that the client side terminates properly as well
- Make `Actor.cancel()` cancel the channel server first
- Actors *must* complete the arbiter registeration steps before moving
  on with their main taks and rpc handling
- When delivering rpc responses (using the local per caller queue) use
  the blocking interface (`trio.Queue.put()`) to get backpressure
- Properly detect an `partial` wrapped async generators in `_invoke`
asyncgen_closing_fix
Tyler Goodlet 2018-07-07 16:50:59 -04:00
parent 10417303aa
commit 77e34049b8
1 changed files with 108 additions and 52 deletions

View File

@ -23,6 +23,7 @@ log = get_logger('tractor')
_current_actor = None _current_actor = None
_default_arbiter_host = '127.0.0.1' _default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616 _default_arbiter_port = 1616
_default_loglevel = None
class ActorFailure(Exception): class ActorFailure(Exception):
@ -55,12 +56,16 @@ async def _invoke(
""" """
try: try:
is_async_partial = False is_async_partial = False
is_async_gen_partial = False
if isinstance(func, partial): if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func) is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if ( if (
not inspect.iscoroutinefunction(func) and not is_async_partial not inspect.iscoroutinefunction(func) and
and not inspect.isasyncgenfunction(func) not inspect.isasyncgenfunction(func) and
not is_async_partial and
not is_async_gen_partial
): ):
await chan.send({'return': func(**kwargs), 'cid': cid}) await chan.send({'return': func(**kwargs), 'cid': cid})
else: else:
@ -76,6 +81,12 @@ async def _invoke(
# if to_send is not None: # if to_send is not None:
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid}) await chan.send({'yield': item, 'cid': cid})
log.warn(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': None, 'cid': cid})
else: else:
if treat_as_gen: if treat_as_gen:
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
@ -139,6 +150,7 @@ class Actor:
uid: str = None, uid: str = None,
allow_rpc: bool = True, allow_rpc: bool = True,
outlive_main: bool = False, outlive_main: bool = False,
loglevel: str = None,
): ):
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
@ -150,6 +162,7 @@ class Actor:
self.statespace = statespace self.statespace = statespace
self._allow_rpc = allow_rpc self._allow_rpc = allow_rpc
self._outlive_main = outlive_main self._outlive_main = outlive_main
self.loglevel = loglevel
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._peers = defaultdict(list) self._peers = defaultdict(list)
@ -226,6 +239,10 @@ class Actor:
log.debug(f"Releasing channel {chan}") log.debug(f"Releasing channel {chan}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if chan.connected():
log.debug(f"Disconnecting channel {chan}")
await chan.send(None)
await chan.aclose()
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
@ -233,15 +250,12 @@ class Actor:
self._no_more_peers.set() self._no_more_peers.set()
log.debug(f"No more peer channels") log.debug(f"No more peer channels")
def _push_result(self, actorid, cid, msg): async def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid) q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
waiters = q.statistics().tasks_waiting_get # maintain backpressure
if not waiters: await q.put(msg)
log.warn(
f"No tasks are currently waiting for results from call {cid}?")
q.put_nowait(msg)
def get_waitq(self, actorid, cid): def get_waitq(self, actorid, cid):
log.debug(f"Registering for callid {cid} queue results from {actorid}") log.debug(f"Registering for callid {cid} queue results from {actorid}")
@ -266,20 +280,23 @@ class Actor:
""" """
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
try: try:
async for msg in chan.aiter_recv(): async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel if msg is None: # terminate sentinel
log.debug(f"Cancelling all tasks for {chan}") log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
log.debug(f"Terminating msg loop for {chan}") log.debug(
f"Terminating msg loop for {chan} from {chan.uid}")
break break
log.debug(f"Received msg {msg}") log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
if cid: # deliver response to local caller/waiter if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg) await self._push_result(chan.uid, cid, msg)
log.debug(f"Waiting on next msg for {chan}") log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue continue
else: else:
ns, funcname, kwargs, actorid, cid = msg['cmd'] ns, funcname, kwargs, actorid, cid = msg['cmd']
@ -312,22 +329,23 @@ class Actor:
_invoke, cid, chan, func, kwargs, treat_as_gen, _invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname name=funcname
) )
log.debug(f"Waiting on next msg for {chan}") log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect else: # channel disconnect
log.debug(f"{chan} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedStreamError: except trio.ClosedStreamError:
log.error(f"{chan} broke") log.error(f"{chan} form {chan.uid} broke")
log.debug(f"Exiting msg loop for {chan}") log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
if self.loglevel:
get_console_log(self.loglevel)
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor global _current_actor
_current_actor = self _current_actor = self
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
try: try:
trio.run(partial( trio.run(partial(
@ -372,6 +390,13 @@ class Actor:
# initial handshake, report who we are, who they are # initial handshake, report who we are, who they are
await _do_handshake(self, chan) await _do_handshake(self, chan)
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
# handle new connection back to parent optionally # handle new connection back to parent optionally
# begin responding to RPC # begin responding to RPC
if self._allow_rpc: if self._allow_rpc:
@ -380,20 +405,14 @@ class Actor:
nursery.start_soon( nursery.start_soon(
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
if self.main: if self.main:
try: try:
if self._parent_chan: if self._parent_chan:
log.debug(f"Starting main task `{self.main}`") log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task # start "main" routine in a task
await nursery.start( await nursery.start(
_invoke, 'main', self._parent_chan, self.main, {}, _invoke, 'main',
self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params False, True # treat_as_gen, raise_errs params
) )
else: else:
@ -412,9 +431,14 @@ class Actor:
# cancelled or signalled by the parent actor) # cancelled or signalled by the parent actor)
except Exception: except Exception:
if self._parent_chan: if self._parent_chan:
log.exception("Actor errored:") try:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'main'}) {'error': traceback.format_exc(), 'cid': 'main'})
except trio.ClosedStreamError:
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:")
else: else:
raise raise
finally: finally:
@ -423,7 +447,7 @@ class Actor:
if arbiter_addr is not None: if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run( await arb_portal.run(
'self', 'register_actor', 'self', 'unregister_actor',
name=self.name, sockaddr=self.accept_addr) name=self.name, sockaddr=self.accept_addr)
except OSError: except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter") log.warn(f"Unable to unregister {self.name} from arbiter")
@ -434,6 +458,11 @@ class Actor:
await self._no_more_peers.wait() await self._no_more_peers.wait()
log.debug(f"All peer channels are complete") log.debug(f"All peer channels are complete")
# tear down channel server
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
return result return result
async def _serve_forever( async def _serve_forever(
@ -473,6 +502,7 @@ class Actor:
"""This cancels the internal root-most nursery thereby gracefully """This cancels the internal root-most nursery thereby gracefully
cancelling (for all intents and purposes) this actor. cancelling (for all intents and purposes) this actor.
""" """
self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
def cancel_server(self): def cancel_server(self):
@ -535,6 +565,7 @@ class Portal:
""" """
def __init__(self, channel): def __init__(self, channel):
self.channel = channel self.channel = channel
self._result = None
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
@ -566,11 +597,14 @@ class Portal:
try: try:
yield msg['yield'] yield msg['yield']
except KeyError: except KeyError:
if 'stop' in msg:
break # far end async gen terminated
else:
raise RemoteActorError(msg['error']) raise RemoteActorError(msg['error'])
except GeneratorExit: except GeneratorExit:
log.debug( log.debug(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
"{self.channel.uid}") f"{self.channel.uid}")
return yield_from_q() return yield_from_q()
@ -582,12 +616,19 @@ class Portal:
async def result(self): async def result(self):
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._result is None:
q = current_actor().get_waitq(self.channel.uid, 'main') q = current_actor().get_waitq(self.channel.uid, 'main')
first_msg = (await result_from_q(q)) resptype, first_msg, q = (await result_from_q(q))
val = await self._return_from_resptype( self._result = await self._return_from_resptype(
'main', *first_msg) 'main', resptype, first_msg, q)
await q.put(first_msg) # for next consumer (e.g. nursery) # await q.put(first_msg) # for next consumer (e.g. nursery)
return val return self._result
async def close(self):
# trigger remote msg loop `break`
chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None)
@asynccontextmanager @asynccontextmanager
@ -614,7 +655,12 @@ async def open_portal(channel, nursery=None):
actor._peers[channel.uid].append(channel) actor._peers[channel.uid].append(channel)
nursery.start_soon(actor._process_messages, channel) nursery.start_soon(actor._process_messages, channel)
yield Portal(channel) portal = Portal(channel)
yield portal
# cancel remote channel-msg loop
if channel.connected():
await portal.close()
# cancel background msg loop task # cancel background msg loop task
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
@ -644,7 +690,7 @@ class ActorNursery:
"""Spawn scoped subprocess actors. """Spawn scoped subprocess actors.
""" """
def __init__(self, actor, supervisor=None): def __init__(self, actor, supervisor=None):
self.supervisor = supervisor self.supervisor = supervisor # TODO
self._actor = actor self._actor = actor
# We'll likely want some way to cancel all sub-actors eventually # We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope # self.cancel_scope = cancel_scope
@ -661,7 +707,7 @@ class ActorNursery:
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set log level per subactor loglevel=_default_loglevel, # set log level per subactor
): ):
actor = Actor( actor = Actor(
name, name,
@ -670,19 +716,21 @@ class ActorNursery:
statespace=statespace, # global proc state vars statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
outlive_main=outlive_main, outlive_main=outlive_main,
loglevel=loglevel,
) )
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
proc = ctx.Process( proc = ctx.Process(
target=actor._fork_main, target=actor._fork_main,
args=(bind_addr, parent_addr, loglevel), args=(bind_addr, parent_addr),
daemon=True, # daemon=True,
name=name, name=name,
) )
proc.start() proc.start()
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
@ -709,6 +757,12 @@ class ActorNursery:
nursery.start_soon(wait_for_proc, proc) nursery.start_soon(wait_for_proc, proc)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
log.debug(f"Cancelling nursery") log.debug(f"Cancelling nursery")
for subactor, proc, main_q in self._children.values(): for subactor, proc, main_q in self._children.values():
if proc is mp.current_process(): if proc is mp.current_process():
@ -718,8 +772,8 @@ class ActorNursery:
if hard_kill: if hard_kill:
log.warn(f"Hard killing subactors {self._children}") log.warn(f"Hard killing subactors {self._children}")
proc.terminate() proc.terminate()
# send KeyBoardInterrupt (trio abort signal) to underlying # XXX: doesn't seem to work?
# sub-actors # send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT) # os.kill(proc.pid, signal.SIGINT)
else: else:
# send cancel cmd - likely no response from subactor # send cancel cmd - likely no response from subactor
@ -750,11 +804,11 @@ class ActorNursery:
await chan.send(None) await chan.send(None)
if etype is not None: if etype is not None:
log.warn(f"{current_actor().uid} errored with {etype}, " log.exception(f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery") "cancelling actor nursery")
await self.cancel() await self.cancel()
else: else:
log.debug(f"Waiting on subactors to complete") log.debug(f"Waiting on subactors {self._children} to complete")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values(): for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q) nursery.start_soon(wait_for_actor, subactor, proc, main_q)
@ -786,7 +840,7 @@ class NoArbiterFound(Exception):
"Couldn't find the arbiter?" "Couldn't find the arbiter?"
async def start_actor(actor, host, port, arbiter_addr, nursery=None): async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main """Spawn a local actor by starting a task to execute it's main
async function. async function.
@ -884,6 +938,8 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
main = partial(async_fn, *args) if async_fn else None main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or ( arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port) _default_arbiter_host, _default_arbiter_port)
get_console_log(kwargs.get('loglevel', _default_loglevel))
# make a temporary connection to see if an arbiter exists # make a temporary connection to see if an arbiter exists
arbiter_found = False arbiter_found = False
try: try:
@ -911,7 +967,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
# Note that if the current actor is the arbiter it is desirable # Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has # for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI). # taken place - which is not implemented yet FYI).
return await start_actor(actor, host, port, arbiter_addr=arbiter_addr) return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):