forked from goodboy/tractor
Move "treat_as_gen" detection into `_invoke()`
parent
b0ceb308ba
commit
c3eee1f228
|
@ -46,6 +46,19 @@ async def _invoke(
|
||||||
):
|
):
|
||||||
"""Invoke local func and return results over provided channel.
|
"""Invoke local func and return results over provided channel.
|
||||||
"""
|
"""
|
||||||
|
sig = inspect.signature(func)
|
||||||
|
treat_as_gen = False
|
||||||
|
if 'chan' in sig.parameters:
|
||||||
|
assert 'cid' in sig.parameters, \
|
||||||
|
f"{func} must accept a `cid` (caller id) kwarg"
|
||||||
|
kwargs['chan'] = chan
|
||||||
|
kwargs['cid'] = cid
|
||||||
|
# TODO: eventually we want to be more stringent
|
||||||
|
# about what is considered a far-end async-generator.
|
||||||
|
# Right now both actual async gens and any async
|
||||||
|
# function which declares a `chan` kwarg in its
|
||||||
|
# signature will be treated as one.
|
||||||
|
treat_as_gen = True
|
||||||
try:
|
try:
|
||||||
is_async_partial = False
|
is_async_partial = False
|
||||||
is_async_gen_partial = False
|
is_async_gen_partial = False
|
||||||
|
@ -183,11 +196,13 @@ class Actor:
|
||||||
log.debug(f"{uid} successfully connected back to us")
|
log.debug(f"{uid} successfully connected back to us")
|
||||||
return event, self._peers[uid][-1]
|
return event, self._peers[uid][-1]
|
||||||
|
|
||||||
def load_namespaces(self) -> None:
|
def load_modules(self) -> None:
|
||||||
# We load namespaces after fork since this actor may
|
"""Load allowed RPC modules locally (after fork).
|
||||||
# be spawned on a different machine from the original nursery
|
|
||||||
# and we need to try and load the local module code (if it
|
Since this actor may be spawned on a different machine from
|
||||||
# exists)
|
the original nursery we need to try and load the local module
|
||||||
|
code (if it exists).
|
||||||
|
"""
|
||||||
for path in self.rpc_module_paths:
|
for path in self.rpc_module_paths:
|
||||||
self._mods[path] = importlib.import_module(path)
|
self._mods[path] = importlib.import_module(path)
|
||||||
|
|
||||||
|
@ -261,6 +276,8 @@ class Actor:
|
||||||
await chan.aclose()
|
await chan.aclose()
|
||||||
|
|
||||||
async def _push_result(self, actorid, cid: str, msg: dict) -> None:
|
async def _push_result(self, actorid, cid: str, msg: dict) -> None:
|
||||||
|
"""Push an RPC result to the local consumer's queue.
|
||||||
|
"""
|
||||||
assert actorid, f"`actorid` can't be {actorid}"
|
assert actorid, f"`actorid` can't be {actorid}"
|
||||||
q = self.get_waitq(actorid, cid)
|
q = self.get_waitq(actorid, cid)
|
||||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
||||||
|
@ -288,9 +305,9 @@ class Actor:
|
||||||
async def _process_messages(
|
async def _process_messages(
|
||||||
self, chan: Channel, treat_as_gen: bool = False
|
self, chan: Channel, treat_as_gen: bool = False
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process messages async-RPC style.
|
"""Process messages for the channel async-RPC style.
|
||||||
|
|
||||||
Process rpc requests and deliver retrieved responses from channels.
|
Receive multiplexed RPC requests and deliver responses over ``chan``.
|
||||||
"""
|
"""
|
||||||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||||
# worked out we'll likely want to use that!
|
# worked out we'll likely want to use that!
|
||||||
|
@ -336,23 +353,9 @@ class Actor:
|
||||||
func = getattr(self._mods[ns], funcname)
|
func = getattr(self._mods[ns], funcname)
|
||||||
|
|
||||||
# spin up a task for the requested function
|
# spin up a task for the requested function
|
||||||
sig = inspect.signature(func)
|
|
||||||
treat_as_gen = False
|
|
||||||
if 'chan' in sig.parameters:
|
|
||||||
assert 'cid' in sig.parameters, \
|
|
||||||
f"{func} must accept a `cid` (caller id) kwarg"
|
|
||||||
kwargs['chan'] = chan
|
|
||||||
kwargs['cid'] = cid
|
|
||||||
# TODO: eventually we want to be more stringent
|
|
||||||
# about what is considered a far-end async-generator.
|
|
||||||
# Right now both actual async gens and any async
|
|
||||||
# function which declares a `chan` kwarg in its
|
|
||||||
# signature will be treated as one.
|
|
||||||
treat_as_gen = True
|
|
||||||
|
|
||||||
log.debug(f"Spawning task for {func}")
|
log.debug(f"Spawning task for {func}")
|
||||||
cs = await self._root_nursery.start(
|
cs = await self._root_nursery.start(
|
||||||
_invoke, self, cid, chan, func, kwargs, treat_as_gen,
|
_invoke, self, cid, chan, func, kwargs,
|
||||||
name=funcname
|
name=funcname
|
||||||
)
|
)
|
||||||
# never allow cancelling cancel requests (results in
|
# never allow cancelling cancel requests (results in
|
||||||
|
@ -421,7 +424,7 @@ class Actor:
|
||||||
self._root_nursery = nursery
|
self._root_nursery = nursery
|
||||||
|
|
||||||
# load allowed RPC module
|
# load allowed RPC module
|
||||||
self.load_namespaces()
|
self.load_modules()
|
||||||
|
|
||||||
# Startup up channel server
|
# Startup up channel server
|
||||||
host, port = accept_addr
|
host, port = accept_addr
|
||||||
|
|
Loading…
Reference in New Issue