From 41f2096e864f17c552d0bd3f732bb006620d1bca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 15:27:38 -0500 Subject: [PATCH] Adopt `Context` in the RPC core Instead of chan/cid, whenever a remote function defines a `ctx` argument name deliver a `Context` instance to the function. This allows remote funcs to provide async generator like streaming replies (and maybe more later). Additionally, - load actor modules *after* establishing a connection to the spawning parent to avoid crashing before the error can be reported upwards - fix a bug to do with unpacking and raising local internal actor errors from received messages --- tractor/_actor.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index d9a5ee1..ed2e031 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -13,9 +13,14 @@ from typing import Dict, List, Tuple, Any, Optional, Union import trio # type: ignore from async_generator import asynccontextmanager, aclosing -from ._ipc import Channel, _connect_chan +from ._ipc import Channel, _connect_chan, Context from .log import get_console_log, get_logger -from ._exceptions import pack_error, InternalActorError, ModuleNotExposed +from ._exceptions import ( + pack_error, + unpack_error, + InternalActorError, + ModuleNotExposed +) from ._portal import ( Portal, open_portal, @@ -46,15 +51,13 @@ async def _invoke( sig = inspect.signature(func) treat_as_gen = False cs = None - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid + ctx = Context(chan, cid) + if 'ctx' in sig.parameters: + kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent # about what is considered a far-end async-generator. # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its + # function which declares a `ctx` kwarg in its # signature will be treated as one. treat_as_gen = True try: @@ -304,7 +307,8 @@ class Actor: await chan.send(None) await chan.aclose() except trio.BrokenResourceError: - log.exception(f"Channel for {chan.uid} was already zonked..") + log.exception( + f"Channel for {chan.uid} was already zonked..") async def _push_result(self, actorid, cid: str, msg: dict) -> None: """Push an RPC result to the local consumer's queue. @@ -389,12 +393,9 @@ class Actor: # (i.e. no cid was provided in the msg - see above). # Push this error to all local channel consumers # (normally portals) by marking the channel as errored - tb_str = msg.get('tb_str') assert chan.uid - exc = InternalActorError( - f"{chan.uid}\n" + tb_str, - **msg, - ) + exc = unpack_error( + msg, chan=chan, err_type=InternalActorError) chan._exc = exc raise exc @@ -493,9 +494,6 @@ class Actor: async with trio.open_nursery() as nursery: self._root_nursery = nursery - # load allowed RPC module - self.load_modules() - # Startup up channel server host, port = accept_addr await nursery.start(partial( @@ -524,6 +522,11 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) + # load exposed/allowed RPC modules + # XXX: do this **after** establishing connection to parent + # so that import errors are properly propagated upwards + self.load_modules() + # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") async with get_arbiter(*arbiter_addr) as arb_portal: