Fix some bugs to get tests working

Fix quite a few little bugs:
- async gen func detection in `_invoke()`
- always cancel channel server on main task exit
- wait for remaining channel peers after unsub from arbiter
- return result from main task(s) all the way up to `tractor.run()`

Also add a `Portal.result()` for getting the final result(s) from the
actor's main task and fix up a bunch of docs.
asyncgen_closing_fix
Tyler Goodlet 2018-07-06 02:36:21 -04:00
parent 2cc03965d8
commit 36fd75e217
1 changed files with 72 additions and 49 deletions

View File

@ -58,7 +58,10 @@ async def _invoke(
if isinstance(func, partial): if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func) is_async_partial = inspect.iscoroutinefunction(func.func)
if not inspect.iscoroutinefunction(func) and not is_async_partial: if (
not inspect.iscoroutinefunction(func) and not is_async_partial
and not inspect.isasyncgenfunction(func)
):
await chan.send({'return': func(**kwargs), 'cid': cid}) await chan.send({'return': func(**kwargs), 'cid': cid})
else: else:
coro = func(**kwargs) coro = func(**kwargs)
@ -276,9 +279,6 @@ class Actor:
cid = msg.get('cid') cid = msg.get('cid')
if cid: # deliver response to local caller/waiter if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg) self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
log.debug(f"Waiting on next msg for {chan}") log.debug(f"Waiting on next msg for {chan}")
continue continue
else: else:
@ -343,7 +343,8 @@ class Actor:
parent_addr=None, parent_addr=None,
nursery=None nursery=None
): ):
"""Start the channel server and main task. """Start the channel server, maybe connect back to the parent, and
start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
@ -387,6 +388,7 @@ class Actor:
name=self.name, sockaddr=self.accept_addr) name=self.name, sockaddr=self.accept_addr)
if self.main: if self.main:
try:
if self._parent_chan: if self._parent_chan:
log.debug(f"Starting main task `{self.main}`") log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task # start "main" routine in a task
@ -399,12 +401,7 @@ class Actor:
log.debug(f"Running `{self.main}` directly") log.debug(f"Running `{self.main}` directly")
result = await self.main() result = await self.main()
# terminate local in-proc once its main completes finally:
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server # tear down channel server
if not self._outlive_main: if not self._outlive_main:
log.debug(f"Shutting down channel server") log.debug(f"Shutting down channel server")
@ -431,6 +428,12 @@ class Actor:
except OSError: except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter") log.warn(f"Unable to unregister {self.name} from arbiter")
# terminate local in-proc once its main completes
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
return result return result
async def _serve_forever( async def _serve_forever(
@ -441,9 +444,10 @@ class Actor:
accept_port=0, accept_port=0,
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Main coroutine: connect back to the parent, spawn main task, begin """Start the channel server, begin listening for new connections.
listening for new messages.
This will cause an actor to continue living (blocking) until
``cancel_server()`` is called.
""" """
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
self._server_nursery = nursery self._server_nursery = nursery
@ -454,6 +458,8 @@ class Actor:
partial( partial(
trio.serve_tcp, trio.serve_tcp,
self._stream_handler, self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=self._root_nursery, handler_nursery=self._root_nursery,
port=accept_port, host=accept_host, port=accept_port, host=accept_host,
) )
@ -543,13 +549,13 @@ class Portal:
# TODO: not this needs some serious work and thinking about how # TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels! # to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff) # (think `yield from`, `gen.send()`, and functional reactive stuff)
chan = self.channel
# ship a function call request to the remote actor
actor = current_actor() actor = current_actor()
# ship a function call request to the remote actor
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle
return await self._return_from_resptype(cid, *(await result_from_q(q)))
cid, q = await actor.send_cmd(chan, ns, func, kwargs) async def _return_from_resptype(self, cid, resptype, first_msg, q):
# wait on first response msg
resptype, first_msg, q = await result_from_q(q)
if resptype == 'yield': if resptype == 'yield':
@ -562,7 +568,9 @@ class Portal:
except KeyError: except KeyError:
raise RemoteActorError(msg['error']) raise RemoteActorError(msg['error'])
except GeneratorExit: except GeneratorExit:
log.debug(f"Cancelling async gen call {cid} to {chan.uid}") log.debug(
f"Cancelling async gen call {cid} to "
"{self.channel.uid}")
return yield_from_q() return yield_from_q()
@ -571,12 +579,22 @@ class Portal:
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self):
"""Return the result(s) from the remote actor's "main" task.
"""
q = current_actor().get_waitq(self.channel.uid, 'main')
first_msg = (await result_from_q(q))
val = await self._return_from_resptype(
'main', *first_msg)
await q.put(first_msg) # for next consumer (e.g. nursery)
return val
@asynccontextmanager @asynccontextmanager
async def open_portal(channel, nursery=None): async def open_portal(channel, nursery=None):
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle rpc message processing. Spawns a background task to handle message processing.
""" """
actor = current_actor() actor = current_actor()
assert actor assert actor
@ -643,12 +661,12 @@ class ActorNursery:
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set console logging per subactor loglevel=None, # set log level per subactor
): ):
actor = Actor( actor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths, rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
outlive_main=outlive_main, outlive_main=outlive_main,
@ -783,7 +801,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None):
# NOTE: this won't block since we provide the nursery # NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}") log.info(f"Starting local {actor} @ {host}:{port}")
await actor._async_main( result = await actor._async_main(
accept_addr=(host, port), accept_addr=(host, port),
parent_addr=None, parent_addr=None,
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
@ -799,11 +817,14 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None):
_current_actor = None _current_actor = None
log.info("Completed async main") log.info("Completed async main")
return result
@asynccontextmanager @asynccontextmanager
async def _connect_chan(host, port): async def _connect_chan(host, port):
"""Attempt to connect to an arbiter's channel server. """Attempt to connect to an arbiter's channel server.
Return the channel on success or None on failure.
Return the channel on success or ``None`` on failure.
""" """
chan = Channel((host, port)) chan = Channel((host, port))
await chan.connect() await chan.connect()
@ -837,8 +858,8 @@ async def find_actor(
): ):
"""Ask the arbiter to find actor(s) by name. """Ask the arbiter to find actor(s) by name.
Returns a sequence of unconnected portals for each matching actor Returns a connected portal to the last registered matching actor
known to the arbiter (client code is expected to connect the portals). known to the arbiter.
""" """
actor = current_actor() actor = current_actor()
if not actor: if not actor:
@ -847,14 +868,14 @@ async def find_actor(
async with get_arbiter(*arbiter_sockaddr) as arb_portal: async with get_arbiter(*arbiter_sockaddr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name) sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the first one we find # the last one that registered
if sockaddrs: if sockaddrs:
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
else: else:
yield yield None
async def _main(async_fn, args, kwargs, name, arbiter_addr): async def _main(async_fn, args, kwargs, name, arbiter_addr):
@ -871,9 +892,9 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
except OSError: except OSError:
log.warn(f"No actor could be found @ {host}:{port}") log.warn(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter if arbiter_found: # we were able to connect to an arbiter
log.info(f"Arbiter seems to exist @ {host}:{port}") log.info(f"Arbiter seems to exist @ {host}:{port}")
# create a local actor and start up its main routine/task
actor = Actor( actor = Actor(
name or 'anonymous', name or 'anonymous',
main=main, main=main,
@ -882,13 +903,15 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
host, port = (_default_arbiter_host, 0) host, port = (_default_arbiter_host, 0)
else: else:
# start this local actor as the arbiter # start this local actor as the arbiter
# this should eventually get passed `outlive_main=True`?
actor = Arbiter(name or 'arbiter', main=main, **kwargs) actor = Arbiter(name or 'arbiter', main=main, **kwargs)
await start_actor(actor, host, port, arbiter_addr=arbiter_addr) # ``Actor._async_main()`` creates an internal nursery if one is not
# Creates an internal nursery which shouldn't be cancelled even if # provided and thus blocks here until it's main task completes.
# the one opened below is (this is desirable because the arbiter should # Note that if the current actor is the arbiter it is desirable
# stay up until a re-election process has taken place - which is not # for it to stay up indefinitely until a re-election process has
# implemented yet FYI). # taken place - which is not implemented yet FYI).
return await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):