Adopt `Context` in the RPC core

Instead of chan/cid, whenever a remote function defines a `ctx` argument
name deliver a `Context` instance to the function. This allows remote
funcs to provide async generator like streaming replies (and maybe more
later).

Additionally,
- load actor modules *after* establishing a connection to the spawning
  parent to avoid crashing before the error can be reported upwards
- fix a bug to do with unpacking and raising local internal actor errors
  from received messages
contexts
Tyler Goodlet 2019-01-12 15:27:38 -05:00
parent 87a6165430
commit 41f2096e86
1 changed files with 20 additions and 17 deletions

View File

@ -13,9 +13,14 @@ from typing import Dict, List, Tuple, Any, Optional, Union
import trio # type: ignore import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan, Context
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import pack_error, InternalActorError, ModuleNotExposed from ._exceptions import (
pack_error,
unpack_error,
InternalActorError,
ModuleNotExposed
)
from ._portal import ( from ._portal import (
Portal, Portal,
open_portal, open_portal,
@ -46,15 +51,13 @@ async def _invoke(
sig = inspect.signature(func) sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None cs = None
if 'chan' in sig.parameters: ctx = Context(chan, cid)
assert 'cid' in sig.parameters, \ if 'ctx' in sig.parameters:
f"{func} must accept a `cid` (caller id) kwarg" kwargs['ctx'] = ctx
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent # TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator. # about what is considered a far-end async-generator.
# Right now both actual async gens and any async # Right now both actual async gens and any async
# function which declares a `chan` kwarg in its # function which declares a `ctx` kwarg in its
# signature will be treated as one. # signature will be treated as one.
treat_as_gen = True treat_as_gen = True
try: try:
@ -304,7 +307,8 @@ class Actor:
await chan.send(None) await chan.send(None)
await chan.aclose() await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception(f"Channel for {chan.uid} was already zonked..") log.exception(
f"Channel for {chan.uid} was already zonked..")
async def _push_result(self, actorid, cid: str, msg: dict) -> None: async def _push_result(self, actorid, cid: str, msg: dict) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
@ -389,12 +393,9 @@ class Actor:
# (i.e. no cid was provided in the msg - see above). # (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers # Push this error to all local channel consumers
# (normally portals) by marking the channel as errored # (normally portals) by marking the channel as errored
tb_str = msg.get('tb_str')
assert chan.uid assert chan.uid
exc = InternalActorError( exc = unpack_error(
f"{chan.uid}\n" + tb_str, msg, chan=chan, err_type=InternalActorError)
**msg,
)
chan._exc = exc chan._exc = exc
raise exc raise exc
@ -493,9 +494,6 @@ class Actor:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# load allowed RPC module
self.load_modules()
# Startup up channel server # Startup up channel server
host, port = accept_addr host, port = accept_addr
await nursery.start(partial( await nursery.start(partial(
@ -524,6 +522,11 @@ class Actor:
nursery.start_soon( nursery.start_soon(
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent
# so that import errors are properly propagated upwards
self.load_modules()
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal: