diff --git a/tractor/_actor.py b/tractor/_actor.py index 4be6381..270a339 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -46,6 +46,19 @@ async def _invoke( ): """Invoke local func and return results over provided channel. """ + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True try: is_async_partial = False is_async_gen_partial = False @@ -183,11 +196,13 @@ class Actor: log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_namespaces(self) -> None: - # We load namespaces after fork since this actor may - # be spawned on a different machine from the original nursery - # and we need to try and load the local module code (if it - # exists) + def load_modules(self) -> None: + """Load allowed RPC modules locally (after fork). + + Since this actor may be spawned on a different machine from + the original nursery we need to try and load the local module + code (if it exists). + """ for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) @@ -261,6 +276,8 @@ class Actor: await chan.aclose() async def _push_result(self, actorid, cid: str, msg: dict) -> None: + """Push an RPC result to the local consumer's queue. + """ assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") @@ -288,9 +305,9 @@ class Actor: async def _process_messages( self, chan: Channel, treat_as_gen: bool = False ) -> None: - """Process messages async-RPC style. + """Process messages for the channel async-RPC style. - Process rpc requests and deliver retrieved responses from channels. + Receive multiplexed RPC requests and deliver responses over ``chan``. """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! @@ -336,23 +353,9 @@ class Actor: func = getattr(self._mods[ns], funcname) # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - log.debug(f"Spawning task for {func}") cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, treat_as_gen, + _invoke, self, cid, chan, func, kwargs, name=funcname ) # never allow cancelling cancel requests (results in @@ -421,7 +424,7 @@ class Actor: self._root_nursery = nursery # load allowed RPC module - self.load_namespaces() + self.load_modules() # Startup up channel server host, port = accept_addr