Add remote actor error handling and parent re-raising
Command requests are sent out and responses are handled in a "message loop" where each command is associated with a "caller id" and multiple cmds and results are multiplexed on the came inter-actor channel. When a cmd result arrives it is pushed into a local queue and delivered to the appropriate calling actor's task. Errors from a remote actor are always shipped in an "error" packet back to their spawning-parent actor such that any error in a subactor is always raised directly in the parent. Based on the first response to a cmd (either a 'return' or 'yield' packet) the caller side portal will retrieve values by wrapping the local response queue in either of an async function or generator as appropriate.kivy_mainline_and_py3.8
							parent
							
								
									75996fed0d
								
							
						
					
					
						commit
						ef90d7f106
					
				
							
								
								
									
										391
									
								
								piker/tractor.py
								
								
								
								
							
							
						
						
									
										391
									
								
								piker/tractor.py
								
								
								
								
							|  | @ -1,13 +1,14 @@ | |||
| """ | ||||
| tracor: An actor model micro-framework. | ||||
| """ | ||||
| import uuid | ||||
| import inspect | ||||
| import importlib | ||||
| from functools import partial | ||||
| import multiprocessing as mp | ||||
| from typing import Coroutine | ||||
| from collections import defaultdict | ||||
| from functools import partial | ||||
| from typing import Coroutine | ||||
| import importlib | ||||
| import inspect | ||||
| import multiprocessing as mp | ||||
| import traceback | ||||
| import uuid | ||||
| 
 | ||||
| import trio | ||||
| from async_generator import asynccontextmanager | ||||
|  | @ -19,6 +20,9 @@ from .log import get_console_log, get_logger | |||
| ctx = mp.get_context("forkserver") | ||||
| log = get_logger('tractor') | ||||
| 
 | ||||
| # set at startup and after forks | ||||
| _current_actor = None | ||||
| 
 | ||||
| # for debugging | ||||
| log = get_console_log('debug') | ||||
| 
 | ||||
|  | @ -27,8 +31,8 @@ class ActorFailure(Exception): | |||
|     "General actor failure" | ||||
| 
 | ||||
| 
 | ||||
| # set at startup and after forks | ||||
| _current_actor = None | ||||
| class RemoteActorError(ActorFailure): | ||||
|     "Remote actor exception bundled locally" | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
|  | @ -44,6 +48,58 @@ async def maybe_open_nursery(nursery=None): | |||
|             yield nursery | ||||
| 
 | ||||
| 
 | ||||
| async def _invoke( | ||||
|     cid, chan, func, kwargs, | ||||
|     treat_as_gen=False, raise_errs=False): | ||||
|     """Invoke local func and return results over provided channel. | ||||
|     """ | ||||
|     try: | ||||
|         is_async_func = False | ||||
|         if isinstance(func, partial): | ||||
|             is_async_func = inspect.iscoroutinefunction(func.func) | ||||
| 
 | ||||
|         if not inspect.iscoroutinefunction(func) and not is_async_func: | ||||
|             await chan.send({'return': func(**kwargs), 'cid': cid}) | ||||
|         else: | ||||
|             coro = func(**kwargs) | ||||
| 
 | ||||
|             if inspect.isasyncgen(coro): | ||||
|                 # await chan.send('gen') | ||||
|                 async for item in coro: | ||||
|                     # TODO: can we send values back in here? | ||||
|                     # How do we do it, spawn another task? | ||||
|                     # to_send = await chan.recv() | ||||
|                     # if to_send is not None: | ||||
|                     #     await coro.send(to_send) | ||||
|                     await chan.send({'yield': item, 'cid': cid}) | ||||
|             else: | ||||
|                 if treat_as_gen: | ||||
|                     # XXX: the async-func may spawn further tasks which push | ||||
|                     # back values like an async-generator would but must | ||||
|                     # manualy construct the response dict-packet-responses as above | ||||
|                     await coro | ||||
|                 else: | ||||
|                     await chan.send({'return': await coro, 'cid': cid}) | ||||
|     except Exception: | ||||
|         if not raise_errs: | ||||
|             await chan.send({'error': traceback.format_exc(), 'cid': cid}) | ||||
|         else: | ||||
|             raise | ||||
| 
 | ||||
| async def get_result(q): | ||||
|     """Process a msg from a remote actor. | ||||
|     """ | ||||
|     first_msg = await q.get() | ||||
|     if 'return' in first_msg: | ||||
|         return 'return', first_msg, q | ||||
|     elif 'yield' in first_msg: | ||||
|         return 'yield', first_msg, q | ||||
|     elif 'error' in first_msg: | ||||
|         raise RemoteActorError(first_msg['error']) | ||||
|     else: | ||||
|         raise ValueError(f"{first_msg} is an invalid response packet?") | ||||
| 
 | ||||
| 
 | ||||
| class Actor: | ||||
|     """The fundamental concurrency primitive. | ||||
| 
 | ||||
|  | @ -56,14 +112,14 @@ class Actor: | |||
|     def __init__( | ||||
|         self, | ||||
|         name: str, | ||||
|         namespaces: [str], | ||||
|         main: Coroutine, | ||||
|         statespace: dict, | ||||
|         main: Coroutine = None, | ||||
|         rpc_module_paths: [str] = [], | ||||
|         statespace: dict = {}, | ||||
|         uid: str = None, | ||||
|         allow_rpc: bool = True, | ||||
|     ): | ||||
|         self.uid = (name, uid or str(uuid.uuid1())) | ||||
|         self.namespaces = namespaces | ||||
|         self.rpc_module_paths = rpc_module_paths | ||||
|         self._mods = {} | ||||
|         self.main = main | ||||
|         # TODO: consider making this a dynamically defined | ||||
|  | @ -73,6 +129,7 @@ class Actor: | |||
| 
 | ||||
|         # filled in by `_async_main` after fork | ||||
|         self._peers = {} | ||||
|         self._actors2calls = {}  # map {uids -> {callids -> waiter queues}} | ||||
|         self._listeners = [] | ||||
|         self._parent_chan = None | ||||
|         self._accept_host = None | ||||
|  | @ -91,33 +148,29 @@ class Actor: | |||
|         # be spawned on a different machine from the original nursery | ||||
|         # and we need to try and load the local module code (if it | ||||
|         # exists) | ||||
|         for path in self.namespaces: | ||||
|         for path in self.rpc_module_paths: | ||||
|             self._mods[path] = importlib.import_module(path) | ||||
| 
 | ||||
|     async def _stream_handler( | ||||
|         self, | ||||
|         stream: trio.SocketStream, | ||||
|     ): | ||||
|         """Receive requests and deliver responses spinning up new | ||||
|         channels where necessary. | ||||
| 
 | ||||
|         Basically RPC with an async twist ;) | ||||
|         """ | ||||
|         Entry point for new inbound connections to the channel server. | ||||
|         """ | ||||
|         chan = Channel(stream=stream) | ||||
|         log.info(f"New {chan} connected to us") | ||||
|         # send/receive initial handshake response | ||||
|         await chan.send(self.uid) | ||||
|         uid = await chan.recv() | ||||
|         chan.uid = uid | ||||
|         log.info(f"Handshake with actor {uid}@{chan.raddr} complete") | ||||
| 
 | ||||
|         # XXX WTF!?!! THIS BLOCKS RANDOMLY? | ||||
|         # assert tuple(raddr) == chan.laddr | ||||
| 
 | ||||
|         # execute main coroutine provided by spawner | ||||
|         if self.main: | ||||
|             await self.main(actor=self) | ||||
| 
 | ||||
|         event = self._peers.pop(uid, None) | ||||
|         chan.event = event | ||||
|         self._peers[uid] = chan | ||||
|         log.info(f"Registered {chan} for {uid}") | ||||
|         log.debug(f"Retrieved event {event}") | ||||
|  | @ -126,59 +179,63 @@ class Actor: | |||
|         # a recently spawned actor which we'd like to control via | ||||
|         # async-rpc calls. | ||||
|         if event and getattr(event, 'set', None): | ||||
| 
 | ||||
|             log.info(f"Waking waiters of {event.statistics()}") | ||||
|             # Alert any task waiting on this connection to come up | ||||
|             # and don't manage channel messages as some external task is | ||||
|             # waiting to use the channel | ||||
|             # (usually an actor nursery) | ||||
|             event.set() | ||||
|             event.clear() | ||||
|             event.clear()  # now consumer can wait on this channel to close | ||||
| 
 | ||||
|             # wait for channel consumer (usually a portal) to be | ||||
|             # done with the channel | ||||
|             await event.wait() | ||||
| 
 | ||||
|             # Drop ref to channel so it can be gc-ed | ||||
|             self._peers.pop(self._uid, None) | ||||
| 
 | ||||
|         # Remote controlled connection, we are likely a subactor | ||||
|         # being told what to do so manage the channel with async-rpc | ||||
|         else: | ||||
|         # Begin channel management - respond to remote requests and | ||||
|         # process received reponses. | ||||
|         try: | ||||
|             await self._process_messages(chan) | ||||
|         finally: | ||||
|             # Drop ref to channel so it can be gc-ed | ||||
|             self._peers.pop(chan.uid, None) | ||||
|             chan.event.set() | ||||
|             log.debug(f"Releasing channel {chan}") | ||||
| 
 | ||||
|     def _push_result(self, actorid, cid, msg): | ||||
|         q = self.get_waitq(actorid, cid) | ||||
|         log.debug(f"Delivering {msg} from {actorid} to caller {cid}") | ||||
|         q.put_nowait(msg) | ||||
| 
 | ||||
|     def get_waitq(self, actorid, cid): | ||||
|         if actorid not in self._actors2calls: | ||||
|             log.warn(f"Caller id {cid} is not yet registered?") | ||||
|         cids2qs = self._actors2calls.setdefault(actorid, {}) | ||||
|         if cid not in cids2qs: | ||||
|             log.warn(f"Caller id {cid} is not yet registered?") | ||||
|         return cids2qs.setdefault(cid, trio.Queue(1000)) | ||||
| 
 | ||||
|     async def invoke_cmd(self, chan, ns, func, kwargs): | ||||
|         """Invoke a remote command by sending a `cmd` message and waiting | ||||
|         on the msg processing loop for its response(s). | ||||
|         """ | ||||
|         cid = uuid.uuid1() | ||||
|         q = self.get_waitq(chan.uid, cid) | ||||
|         await chan.send((ns, func, kwargs, self.uid, cid)) | ||||
|         return await get_result(q) | ||||
| 
 | ||||
|     async def _process_messages(self, chan, treat_as_gen=False): | ||||
|         """Process inbound messages async-RPC style. | ||||
|         """Process messages async-RPC style. | ||||
| 
 | ||||
|         Process rpc requests and deliver retrieved responses from channels. | ||||
|         """ | ||||
|         async def invoke(func, kwargs): | ||||
|             if not inspect.iscoroutinefunction(func): | ||||
|                 await chan.send('func') | ||||
|                 await chan.send(func(**kwargs)) | ||||
|             else: | ||||
|                 coro = func(**kwargs) | ||||
| 
 | ||||
|                 if inspect.isasyncgen(coro): | ||||
|                     await chan.send('gen') | ||||
|                     async for item in coro: | ||||
|                         # TODO: can we send values back in here? | ||||
|                         # How do we do it, spawn another task? | ||||
|                         # to_send = await chan.recv() | ||||
|                         # if to_send is not None: | ||||
|                         #     await coro.send(to_send) | ||||
|                         await chan.send(item) | ||||
|                 else: | ||||
|                     if treat_as_gen: | ||||
|                         await chan.send('gen') | ||||
|                     else: | ||||
|                         await chan.send('func') | ||||
| 
 | ||||
|                     # XXX: the async-func may spawn further tasks which push | ||||
|                     # back values like an async-generator would | ||||
|                     await chan.send(await coro) | ||||
| 
 | ||||
|         log.debug(f"Entering async-rpc loop for {chan.laddr}->{chan.raddr}") | ||||
|         log.debug(f"Entering async-rpc loop for {chan}") | ||||
|         async with trio.open_nursery() as nursery: | ||||
|             async for ns, funcname, kwargs, actorid in chan.aiter_recv(): | ||||
|             async for msg in chan.aiter_recv(): | ||||
|                 log.debug(f"Received msg {msg}") | ||||
|                 # try: | ||||
|                 cid = msg.get('cid') | ||||
|                 if cid:  # deliver response to local caller/waiter | ||||
|                     self._push_result(chan.uid, cid, msg) | ||||
|                     continue | ||||
|                 else: | ||||
|                     ns, funcname, kwargs, actorid, cid = msg['cmd'] | ||||
|                 # except Exception: | ||||
|                 #     await chan.send({'error': traceback.format_exc()}) | ||||
|                 #     break | ||||
| 
 | ||||
|                 log.debug( | ||||
|                     f"Processing request from {actorid}\n" | ||||
|                     f"{ns}.{funcname}({kwargs})") | ||||
|  | @ -200,7 +257,11 @@ class Actor: | |||
|                     # signature will be treated as one. | ||||
|                     treat_as_gen = True | ||||
| 
 | ||||
|                 nursery.start_soon(invoke, func, kwargs, name=funcname) | ||||
|                 nursery.start_soon( | ||||
|                     _invoke, cid, chan, func, kwargs, treat_as_gen, | ||||
|                     name=funcname | ||||
|                 ) | ||||
|         log.debug(f"Exiting msg loop for {chan}") | ||||
| 
 | ||||
|     def _fork_main(self, accept_addr, parent_addr=None): | ||||
|         # after fork routine which invokes a fresh ``trio.run`` | ||||
|  | @ -220,23 +281,67 @@ class Actor: | |||
|         A "root-most" (or "top-level") nursery for this actor is opened here | ||||
|         and when cancelled effectively cancels the actor. | ||||
|         """ | ||||
|         async with maybe_open_nursery(nursery) as nursery: | ||||
|             self._root_nursery = nursery | ||||
|         result = None | ||||
|         try: | ||||
|             async with maybe_open_nursery(nursery) as nursery: | ||||
|                 self._root_nursery = nursery | ||||
| 
 | ||||
|             # Startup up channel server, optionally begin serving RPC | ||||
|             # requests from the parent. | ||||
|             host, port = accept_addr | ||||
|             await self._serve_forever( | ||||
|                 nursery, accept_host=host, accept_port=port, | ||||
|                 parent_addr=parent_addr | ||||
|             ) | ||||
|                 # Startup up channel server, optionally begin serving RPC | ||||
|                 # requests from the parent. | ||||
|                 host, port = accept_addr | ||||
|                 await self._serve_forever( | ||||
|                     nursery, accept_host=host, accept_port=port, | ||||
|                 ) | ||||
| 
 | ||||
|             # start "main" routine in a task | ||||
|             if self.main: | ||||
|                 await self.main(self) | ||||
|                 if parent_addr is not None: | ||||
|                     # Connect back to the parent actor and conduct initial | ||||
|                     # handshake (From this point on if we error ship the | ||||
|                     # exception back to the parent actor) | ||||
|                     chan = self._parent_chan = Channel( | ||||
|                         destaddr=parent_addr, | ||||
|                         on_reconnect=self.main | ||||
|                     ) | ||||
|                     await chan.connect() | ||||
| 
 | ||||
|             # blocks here as expected if no nursery was provided until | ||||
|             # the channel server is killed | ||||
|                     # initial handshake, report who we are, figure out who they are | ||||
|                     await chan.send(self.uid) | ||||
|                     uid = await chan.recv() | ||||
|                     if uid in self._peers: | ||||
|                         log.warn( | ||||
|                             f"already have channel for {uid} registered?" | ||||
|                         ) | ||||
|                     else: | ||||
|                         self._peers[uid] = chan | ||||
| 
 | ||||
|                     # handle new connection back to parent | ||||
|                     if self._allow_rpc: | ||||
|                         self.load_namespaces() | ||||
|                         nursery.start_soon(self._process_messages, chan) | ||||
| 
 | ||||
|                 if self.main: | ||||
|                     log.debug(f"Starting main task `{self.main}`") | ||||
|                     if self._parent_chan: | ||||
|                         # start "main" routine in a task | ||||
|                         nursery.start_soon( | ||||
|                             _invoke, 'main', self._parent_chan, self.main, {}, | ||||
|                             False, True  # treat_as_gen, raise_errs params | ||||
|                         ) | ||||
|                     else: | ||||
|                         # run directly | ||||
|                         result = await self.main() | ||||
| 
 | ||||
|                 # blocks here as expected if no nursery was provided until | ||||
|                 # the channel server is killed (i.e. this actor is | ||||
|                 # cancelled or signalled by the parent actor) | ||||
|         except Exception: | ||||
|             if self._parent_chan: | ||||
|                 log.exception("Actor errored:") | ||||
|                 await self._parent_chan.send( | ||||
|                     {'error': traceback.format_exc(), 'cid': 'main'}) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         return result | ||||
| 
 | ||||
|     async def _serve_forever( | ||||
|         self, | ||||
|  | @ -245,7 +350,6 @@ class Actor: | |||
|         # (host, port) to bind for channel server | ||||
|         accept_host=None, | ||||
|         accept_port=0, | ||||
|         parent_addr=None, | ||||
|         task_status=trio.TASK_STATUS_IGNORED | ||||
|     ): | ||||
|         """Main coroutine: connect back to the parent, spawn main task, begin | ||||
|  | @ -264,31 +368,6 @@ class Actor: | |||
|         self._listeners.extend(listeners) | ||||
|         log.debug(f"Spawned {listeners}") | ||||
| 
 | ||||
|         if parent_addr is not None: | ||||
|             # Connect back to the parent actor and conduct initial | ||||
|             # handshake (From this point on if we error ship the | ||||
|             # exception back to the parent actor) | ||||
|             chan = self._parent_chan = Channel( | ||||
|                 destaddr=parent_addr, | ||||
|                 on_reconnect=self.main | ||||
|             ) | ||||
|             await chan.connect() | ||||
| 
 | ||||
|             # initial handshake, report who we are, figure out who they are | ||||
|             await chan.send(self.uid) | ||||
|             uid = await chan.recv() | ||||
|             if uid in self._peers: | ||||
|                 log.warn( | ||||
|                     f"already have channel for {uid} registered?" | ||||
|                 ) | ||||
|             else: | ||||
|                 self._peers[uid] = chan | ||||
| 
 | ||||
|             # handle new connection back to parent | ||||
|             if self._allow_rpc: | ||||
|                 self.load_namespaces() | ||||
|                 nursery.start_soon(self._process_messages, chan) | ||||
| 
 | ||||
|         # when launched in-process, trigger awaiter's completion | ||||
|         task_status.started() | ||||
| 
 | ||||
|  | @ -339,7 +418,7 @@ class Portal: | |||
|     """ | ||||
|     def __init__(self, channel, event=None): | ||||
|         self.channel = channel | ||||
|         self._uid = None | ||||
|         self._uid = channel.uid | ||||
|         self._event = event | ||||
| 
 | ||||
|     async def __aenter__(self): | ||||
|  | @ -369,13 +448,26 @@ class Portal: | |||
|         # (think `yield from`, `gen.send()`, and functional reactive stuff) | ||||
|         chan = self.channel | ||||
|         # ship a function call request to the remote actor | ||||
|         await chan.send((ns, func, kwargs, _current_actor.uid)) | ||||
|         # get expected response type | ||||
|         functype = await chan.recv() | ||||
|         if functype == 'gen': | ||||
|             return chan.aiter_recv() | ||||
|         actor = current_actor() | ||||
| 
 | ||||
|         resptype, first_msg, q = await actor.invoke_cmd(chan, ns, func, kwargs) | ||||
| 
 | ||||
|         if resptype == 'yield': | ||||
| 
 | ||||
|             async def yield_from_q(): | ||||
|                 yield first | ||||
|                 for msg in q: | ||||
|                     try: | ||||
|                         yield msg['yield'] | ||||
|                     except KeyError: | ||||
|                         raise RemoteActorError(msg['error']) | ||||
| 
 | ||||
|             return yield_from_q() | ||||
| 
 | ||||
|         elif resptype == 'return': | ||||
|             return first_msg['return'] | ||||
|         else: | ||||
|             return await chan.recv() | ||||
|             raise ValueError(f"Unknown msg response type: {first_msg}") | ||||
| 
 | ||||
| 
 | ||||
| class LocalPortal: | ||||
|  | @ -398,9 +490,9 @@ class LocalPortal: | |||
| class ActorNursery: | ||||
|     """Spawn scoped subprocess actors. | ||||
|     """ | ||||
|     def __init__(self, parent_actor, supervisor=None): | ||||
|     def __init__(self, actor, supervisor=None): | ||||
|         self.supervisor = supervisor | ||||
|         self._parent_actor = parent_actor | ||||
|         self._actor = actor | ||||
|         # We'll likely want some way to cancel all sub-actors eventually | ||||
|         # self.cancel_scope = cancel_scope | ||||
|         self._children = {} | ||||
|  | @ -409,38 +501,43 @@ class ActorNursery: | |||
|         return self | ||||
| 
 | ||||
|     async def start_actor( | ||||
|         self, name, module_paths, | ||||
|         self, name, | ||||
|         bind_addr=('127.0.0.1', 0), | ||||
|         statespace=None, | ||||
|         rpc_module_paths=None, | ||||
|         main=None, | ||||
|     ): | ||||
|         actor = Actor( | ||||
|             name, | ||||
|             module_paths,  # modules allowed to invoked funcs from | ||||
|             # modules allowed to invoked funcs from | ||||
|             rpc_module_paths=rpc_module_paths, | ||||
|             statespace=statespace,  # global proc state vars | ||||
|             main=main,  # main coroutine to be invoked | ||||
|         ) | ||||
|         parent_addr = self._parent_actor.accept_addr | ||||
|         parent_addr = self._actor.accept_addr | ||||
|         proc = ctx.Process( | ||||
|             target=actor._fork_main, | ||||
|             args=(bind_addr, parent_addr), | ||||
|             daemon=True, | ||||
|             name=name, | ||||
|         ) | ||||
|         self._children[(name, proc.pid)] = (actor, proc) | ||||
|         proc.start() | ||||
| 
 | ||||
|         if not proc.is_alive(): | ||||
|             raise ActorFailure("Couldn't start sub-actor?") | ||||
| 
 | ||||
|         # wait for actor to spawn and connect back to us | ||||
|         # channel should have handshake completed by the | ||||
|         # local actor by the time we get a ref to it | ||||
|         if proc.is_alive(): | ||||
|             event, chan = await self._parent_actor.wait_for_peer(actor.uid) | ||||
|         else: | ||||
|             raise ActorFailure("Couldn't start sub-actor?") | ||||
|         event, chan = await self._actor.wait_for_peer(actor.uid) | ||||
|         # channel is up, get queue which delivers result from main routine | ||||
|         main_q = self._actor.get_waitq(actor.uid, 'main') | ||||
|         self._children[(name, proc.pid)] = (actor, proc, main_q) | ||||
| 
 | ||||
|         return Portal(chan) | ||||
|         return Portal(chan, event=event) | ||||
| 
 | ||||
|     async def wait(self): | ||||
| 
 | ||||
|     async def cancel(self): | ||||
|         async def wait_for_proc(proc): | ||||
|             # TODO: timeout block here? | ||||
|             if proc.is_alive(): | ||||
|  | @ -452,17 +549,26 @@ class ActorNursery: | |||
|         # unblocks when all waiter tasks have completed | ||||
|         async with trio.open_nursery() as nursery: | ||||
|             for actor, proc in self._children.values(): | ||||
|                 if proc is mp.current_process(): | ||||
|                     actor.cancel() | ||||
|                 else: | ||||
|                     # send KeyBoardInterrupt (trio abort signal) to underlying | ||||
|                     # sub-actors | ||||
|                     proc.terminate() | ||||
|                     # os.kill(proc.pid, signal.SIGINT) | ||||
|                     nursery.start_soon(wait_for_proc, proc) | ||||
|                 nursery.start_soon(wait_for_proc, proc) | ||||
| 
 | ||||
|     async def cancel(self): | ||||
|         for actor, proc in self._children.values(): | ||||
|             if proc is mp.current_process(): | ||||
|                 actor.cancel() | ||||
|             else: | ||||
|                 # send KeyBoardInterrupt (trio abort signal) to underlying | ||||
|                 # sub-actors | ||||
|                 proc.terminate() | ||||
|                 # os.kill(proc.pid, signal.SIGINT) | ||||
| 
 | ||||
|         await self.wait() | ||||
| 
 | ||||
|     async def __aexit__(self, etype, value, tb): | ||||
|         await self.cancel() | ||||
|         """Wait on all subactor's main routines to complete. | ||||
|         """ | ||||
|         async with trio.open_nursery() as nursery: | ||||
|             for subactor, proc, q in self._children.values(): | ||||
|                 nursery.start_soon(get_result, q) | ||||
| 
 | ||||
| 
 | ||||
| def current_actor() -> Actor: | ||||
|  | @ -517,7 +623,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): | |||
|         # no arbiter found on this host so start one in-process | ||||
|         arbiter = Arbiter( | ||||
|             'arbiter', | ||||
|             namespaces=[],  # the arbiter doesn't allow module rpc | ||||
|             rpc_module_paths=[],  # the arbiter doesn't allow module rpc | ||||
|             statespace={},  # global proc state vars | ||||
|             main=main,  # main coroutine to be invoked | ||||
|         ) | ||||
|  | @ -538,7 +644,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): | |||
| 
 | ||||
|             # If spawned locally, the arbiter is cancelled when this context | ||||
|             # is complete (i.e the underlying context manager block completes) | ||||
|             nursery.cancel_scope.cancel() | ||||
|             # nursery.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
|  | @ -559,27 +665,26 @@ async def find_actor(name): | |||
| 
 | ||||
| 
 | ||||
| async def _main(async_fn, args, kwargs, name): | ||||
|     main = partial(async_fn, *args) | ||||
|     # Creates an internal nursery which shouldn't be cancelled even if | ||||
|     # the one opened below is (this is desirable because the arbitter should | ||||
|     # stay up until a re-election process has taken place - which is not | ||||
|     # implemented yet FYI). | ||||
|     async with get_arbiter( | ||||
|         host=kwargs.get('arbiter_host', '127.0.0.1'), | ||||
|         port=kwargs.get('arbiter_port', 1616), | ||||
|         main=partial(async_fn, *args, **kwargs) | ||||
|         host=kwargs.pop('arbiter_host', '127.0.0.1'), | ||||
|         port=kwargs.pop('arbiter_port', 1616), | ||||
|         main=main, | ||||
|     ) as portal: | ||||
|         if not current_actor().is_arbiter: | ||||
|             # create a local actor and start it up its main routine | ||||
|             actor = Actor( | ||||
|                 name or 'anonymous', | ||||
|                 # namespaces=kwargs.get('namespaces'), | ||||
|                 # statespace=kwargs.get('statespace'), | ||||
|                 # main=async_fn,  # main coroutine to be invoked | ||||
|                 main=main,  # main coroutine to be invoked | ||||
|                 **kwargs | ||||
|             ) | ||||
|             # this will block and yield control to the `trio` run loop | ||||
|             await serve_local_actor( | ||||
|                 actor, accept_addr=kwargs.get('accept_addr', (None, 0))) | ||||
|                 actor, accept_addr=kwargs.pop('accept_addr', (None, 0))) | ||||
|             log.info("Completed async main") | ||||
|         else: | ||||
|             # block waiting for the arbiter main task to complete | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue