diff --git a/piker/tractor.py b/piker/tractor.py index 0023fba..361e9a0 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -21,6 +21,8 @@ log = get_logger('tractor') # set at startup and after forks _current_actor = None +_default_arbiter_host = '127.0.0.1' +_default_arbiter_port = 1616 class ActorFailure(Exception): @@ -103,12 +105,25 @@ async def result_from_q(q): raise ValueError(f"{first_msg} is an invalid response packet?") +async def _do_handshake(actor, chan): + await chan.send(actor.uid) + uid = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + + class Actor: """The fundamental concurrency primitive. - An actor is the combination of a ``multiprocessing.Process`` - executing a ``trio`` task tree, communicating with other actors - through "portals" which provide a native async API around "channels". + An *actor* is the combination of a regular Python or + ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + with other actors through "portals" which provide a native async API + around "channels". """ is_arbiter = False @@ -122,6 +137,7 @@ class Actor: allow_rpc: bool = True, outlive_main: bool = False, ): + self.name = name self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths self._mods = {} @@ -133,7 +149,7 @@ class Actor: self._outlive_main = outlive_main # filled in by `_async_main` after fork - self._peers = {} + self._peers = defaultdict(list) self._no_more_peers = trio.Event() self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} @@ -148,7 +164,8 @@ class Actor: log.debug(f"Waiting for peer {uid} to connect") event = self._peers.setdefault(uid, trio.Event()) await event.wait() - return event, self._peers[uid] + log.debug(f"{uid} successfully connected back to us") + return event, self._peers[uid][-1] def load_namespaces(self): # We load namespaces after fork since this actor may @@ -168,26 +185,33 @@ class Actor: self._no_more_peers.clear() chan = Channel(stream=stream) log.info(f"New connection to us {chan}") + # send/receive initial handshake response - await chan.send(self.uid) - uid = await chan.recv() - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + try: + uid = await _do_handshake(self, chan) + except StopAsyncIteration: + log.warn(f"Channel {chan} failed to handshake") + return # channel tracking - event = self._peers.pop(uid, None) - chan.event = event - self._peers[uid] = chan - log.debug(f"Registered {chan} for {uid}") - - # Instructing connection: this is likely a new channel to - # a recently spawned actor which we'd like to control via - # async-rpc calls. - if event and getattr(event, 'set', None): - log.debug(f"Waking channel waiters {event.statistics()}") + event_or_chans = self._peers.pop(uid, None) + if isinstance(event_or_chans, trio.Event): + # Instructing connection: this is likely a new channel to + # a recently spawned actor which we'd like to control via + # async-rpc calls. + log.debug(f"Waking channel waiters {event_or_chans.statistics()}") # Alert any task waiting on this connection to come up - event.set() - event.clear() # now consumer can wait on this channel to close + event_or_chans.set() + event_or_chans.clear() # consumer can wait on channel to close + elif isinstance(event_or_chans, list): + log.warn( + f"already have channel(s) for {uid}:{event_or_chans}?" + ) + # append new channel + self._peers[uid].extend(event_or_chans) + + log.debug(f"Registered {chan} for {uid}") + self._peers[uid].append(chan) # Begin channel management - respond to remote requests and # process received reponses. @@ -197,23 +221,24 @@ class Actor: # Drop ref to channel so it can be gc-ed and disconnected if chan is not self._parent_chan: log.debug(f"Releasing channel {chan}") - self._peers.pop(chan.uid, None) - chan.event.set() # signal teardown/disconnection + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) if not self._peers: # no more channels connected self._no_more_peers.set() log.debug(f"No more peer channels") def _push_result(self, actorid, cid, msg): + assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") q.put_nowait(msg) def get_waitq(self, actorid, cid): - if actorid not in self._actors2calls: - log.debug(f"Registering for results from {actorid}") + log.debug(f"Registering for callid {cid} queue results from {actorid}") cids2qs = self._actors2calls.setdefault(actorid, {}) - if cid not in cids2qs: - log.debug(f"Registering for result from call id {cid}") return cids2qs.setdefault(cid, trio.Queue(1000)) async def send_cmd(self, chan, ns, func, kwargs): @@ -232,58 +257,66 @@ class Actor: Process rpc requests and deliver retrieved responses from channels. """ - log.debug(f"Entering async-rpc loop for {chan}") + # TODO: once https://github.com/python-trio/trio/issues/467 gets + # worked out we'll likely want to use that! + log.debug(f"Entering msg loop for {chan}") async with trio.open_nursery() as nursery: - async for msg in chan.aiter_recv(): - if msg is None: # terminate sentinel - log.debug(f"Terminating msg loop for {chan}") - break - log.debug(f"Received msg {msg}") - cid = msg.get('cid') - if cid: # deliver response to local caller/waiter - self._push_result(chan.uid, cid, msg) - if 'error' in msg: - # TODO: need something better then this slop - raise RemoteActorError(msg['error']) - continue - else: - ns, funcname, kwargs, actorid, cid = msg['cmd'] + try: + async for msg in chan.aiter_recv(): + if msg is None: # terminate sentinel + log.debug(f"Terminating msg loop for {chan}") + break + log.debug(f"Received msg {msg}") + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter + self._push_result(chan.uid, cid, msg) + if 'error' in msg: + # TODO: need something better then this slop + raise RemoteActorError(msg['error']) + log.debug(f"Waiting on next msg for {chan}") + continue + else: + ns, funcname, kwargs, actorid, cid = msg['cmd'] - log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - # TODO: accept a sentinel which cancels this task tree? - if ns == 'self': - func = getattr(self, funcname) - else: - func = getattr(self._mods[ns], funcname) + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) + else: + func = getattr(self._mods[ns], funcname) - # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True + # spin up a task for the requested function + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True - nursery.start_soon( - _invoke, cid, chan, func, kwargs, treat_as_gen, - name=funcname - ) - else: # channel disconnect - log.warn(f"Cancelling all tasks for {chan}") - nursery.cancel_scope.cancel() + log.debug(f"Spawning task for {func}") + nursery.start_soon( + _invoke, cid, chan, func, kwargs, treat_as_gen, + name=funcname + ) + log.debug(f"Waiting on next msg for {chan}") + else: # channel disconnect + log.debug(f"{chan} disconnected") + except trio.ClosedStreamError: + log.error(f"{chan} broke") + log.debug(f"Cancelling all tasks for {chan}") + nursery.cancel_scope.cancel() log.debug(f"Exiting msg loop for {chan}") - def _fork_main(self, accept_addr, parent_addr=None, loglevel='debug'): + def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): # after fork routine which invokes a fresh ``trio.run`` log.info( f"Started new {ctx.current_process()} for actor {self.uid}") @@ -299,7 +332,13 @@ class Actor: pass # handle it the same way trio does? log.debug(f"Actor {self.uid} terminated") - async def _async_main(self, accept_addr, parent_addr=None, nursery=None): + async def _async_main( + self, + accept_addr, + arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + parent_addr=None, + nursery=None + ): """Start the channel server and main task. A "root-most" (or "top-level") nursery for this actor is opened here @@ -325,20 +364,23 @@ class Actor: on_reconnect=self.main ) await chan.connect() - # initial handshake, report who we are, who they are - await chan.send(self.uid) - uid = await chan.recv() - if uid in self._peers: - log.warn( - f"already have channel for {uid} registered?" - ) + await _do_handshake(self, chan) - # handle new connection back to parent optionally - # begin responding to RPC - if self._allow_rpc: - self.load_namespaces() - nursery.start_soon(self._process_messages, chan) + # handle new connection back to parent optionally + # begin responding to RPC + if self._allow_rpc: + self.load_namespaces() + if self._parent_chan: + nursery.start_soon( + self._process_messages, self._parent_chan) + + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) if self.main: if self._parent_chan: @@ -374,6 +416,16 @@ class Actor: {'error': traceback.format_exc(), 'cid': 'main'}) else: raise + finally: + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") return result @@ -389,7 +441,6 @@ class Actor: listening for new messages. """ - log.debug(f"Starting tcp server on {accept_host}:{accept_port}") async with trio.open_nursery() as nursery: self._server_nursery = nursery # TODO: might want to consider having a separate nursery @@ -403,8 +454,9 @@ class Actor: port=accept_port, host=accept_host, ) ) + log.debug( + f"Started tcp server(s) on {[l.socket for l in listeners]}") self._listeners.extend(listeners) - log.debug(f"Spawned {listeners}") task_status.started() def cancel(self): @@ -431,8 +483,8 @@ class Actor: def get_parent(self): return Portal(self._parent_chan) - def get_chan(self, actorid): - return self._peers.get(actorid) + def get_chans(self, actorid): + return self._peers[actorid] class Arbiter(Actor): @@ -453,6 +505,14 @@ class Arbiter(Actor): def register_actor(self, name, sockaddr): self._registry[name].append(sockaddr) + def unregister_actor(self, name, sockaddr): + sockaddrs = self._registry.get(name) + if sockaddrs: + try: + sockaddrs.remove(sockaddr) + except ValueError: + pass + class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -463,28 +523,14 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel, event=None): + def __init__(self, channel): self.channel = channel - self._uid = channel.uid - self._event = event - - async def __aenter__(self): - await self.channel.connect() - # do the handshake - await self.channel.send(_current_actor.uid) - self._uid = uid = await self.channel.recv() - _current_actor._peers[uid] = self.channel - return self async def aclose(self): + log.debug(f"Closing {self}") + # XXX: won't work until https://github.com/python-trio/trio/pull/460 + # gets in! await self.channel.aclose() - if self._event: - # alert the _stream_handler task that we are done with the channel - # so it can terminate / move on - self._event.set() - - async def __aexit__(self, etype, value, tb): - await self.aclose() async def run(self, ns, func, **kwargs): """Submit a function to be scheduled and run by actor, return its @@ -512,7 +558,7 @@ class Portal: except KeyError: raise RemoteActorError(msg['error']) except GeneratorExit: - log.warn(f"Cancelling async gen call {cid} to {chan.uid}") + log.debug(f"Cancelling async gen call {cid} to {chan.uid}") return yield_from_q() @@ -522,6 +568,39 @@ class Portal: raise ValueError(f"Unknown msg response type: {first_msg}") +@asynccontextmanager +async def open_portal(channel, nursery=None): + """Open a ``Portal`` through the provided ``channel``. + + Spawns a background task to handle rpc message processing. + """ + actor = current_actor() + assert actor + was_connected = False + + async with maybe_open_nursery(nursery) as nursery: + + if not channel.connected(): + await channel.connect() + was_connected = True + + if channel.uid is None: + await _do_handshake(actor, channel) + + if not actor.get_chans(channel.uid): + # actor is not currently managing this channel + actor._peers[channel.uid].append(channel) + + nursery.start_soon(actor._process_messages, channel) + yield Portal(channel) + + # cancel background msg loop task + nursery.cancel_scope.cancel() + if was_connected: + actor._peers[channel.uid].remove(channel) + await channel.aclose() + + class LocalPortal: """A 'portal' to a local ``Actor``. @@ -590,7 +669,7 @@ class ActorNursery: main_q = self._actor.get_waitq(actor.uid, 'main') self._children[(name, proc.pid)] = (actor, proc, main_q) - return Portal(chan, event=event) + return Portal(chan) async def wait(self): @@ -623,9 +702,10 @@ class ActorNursery: else: # send cancel cmd - likely no response from subactor actor = self._actor - chan = actor.get_chan(subactor.uid) - if chan: - await actor.send_cmd(chan, 'self', 'cancel', {}) + chans = actor.get_chans(subactor.uid) + if chans: + for chan in chans: + await actor.send_cmd(chan, 'self', 'cancel', {}) else: log.warn( f"Channel for {subactor.uid} is already down?") @@ -642,8 +722,10 @@ class ActorNursery: log.info(f"{actor.uid} main task completed with {msg}") if not actor._outlive_main: # trigger msg loop to break - log.info(f"Signalling msg loop exit for {actor.uid}") - await self._actor.get_chan(actor.uid).send(None) + chans = self._actor.get_chans(actor.uid) + for chan in chans: + log.info(f"Signalling msg loop exit for {actor.uid}") + await chan.send(None) if etype is not None: log.warn(f"{current_actor().uid} errored with {etype}, " @@ -655,6 +737,7 @@ class ActorNursery: for subactor, proc, main_q in self._children.values(): nursery.start_soon(wait_for_actor, subactor, proc, main_q) + await self.wait() log.debug(f"Nursery teardown complete") @@ -677,117 +760,136 @@ async def open_nursery(supervisor=None, loglevel='WARNING'): yield nursery -async def serve_local_actor(actor, nursery=None, accept_addr=(None, 0)): +class NoArbiterFound(Exception): + "Couldn't find the arbiter?" + + +async def start_actor(actor, host, port, arbiter_addr, nursery=None): """Spawn a local actor by starting a task to execute it's main async function. Blocks if no nursery is provided, in which case it is expected the nursery provider is responsible for waiting on the task to complete. """ + # assign process-local actor + global _current_actor + _current_actor = actor + + # start local channel-server and fake the portal API + # NOTE: this won't block since we provide the nursery + log.info(f"Starting local {actor} @ {host}:{port}") + await actor._async_main( - accept_addr=accept_addr, + accept_addr=(host, port), parent_addr=None, + arbiter_addr=arbiter_addr, nursery=nursery, ) - return actor + # XXX: If spawned locally, the actor is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. + if not actor._outlive_main: + actor.cancel_server() - -class NoArbiterFound: - "Couldn't find the arbiter?" + # unset module state + _current_actor = None + log.info("Completed async main") @asynccontextmanager -async def get_arbiter(host='127.0.0.1', port=1617, main=None, **kwargs): +async def _connect_chan(host, port): + """Attempt to connect to an arbiter's channel server. + Return the channel on success or None on failure. + """ + chan = Channel((host, port)) + await chan.connect() + yield chan + await chan.aclose() + + +@asynccontextmanager +async def get_arbiter(host, port): + """Return a portal instance connected to a local or remote + arbiter. + """ actor = current_actor() - if actor and not actor.is_arbiter: - try: - # If an arbiter is already running on this host connect to it - async with Portal(Channel((host, port))) as portal: - yield portal - except OSError as err: - raise NoArbiterFound(err) + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) else: - if actor and actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - arbiter = Arbiter('arbiter', main=main, **kwargs) - # assign process-local actor - global _current_actor - _current_actor = arbiter - - # start the arbiter in process in a new task - async with trio.open_nursery() as nursery: - - # start local channel-server and fake the portal API - # NOTE: this won't block since we provide the nursery - await serve_local_actor( - arbiter, nursery=nursery, accept_addr=(host, port)) - - yield LocalPortal(arbiter) - - # XXX: If spawned locally, the arbiter is cancelled when this - # context is complete given that there are no more active - # peer channels connected to it. - if not arbiter._outlive_main: - arbiter.cancel_server() + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal @asynccontextmanager -async def find_actor(name): +async def find_actor( + name, + arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) +): """Ask the arbiter to find actor(s) by name. Returns a sequence of unconnected portals for each matching actor known to the arbiter (client code is expected to connect the portals). """ - async with get_arbiter() as portal: - sockaddrs = await portal.run('self', 'find_actor', name=name) - portals = [] + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + async with get_arbiter(*arbiter_sockaddr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the first one we find if sockaddrs: - for sockaddr in sockaddrs: - portals.append(Portal(Channel(sockaddr))) - - yield portals # XXX: these are "unconnected" portals + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield -async def _main(async_fn, args, kwargs, name): +async def _main(async_fn, args, kwargs, name, arbiter_addr): + """Async entry point for ``tractor``. + """ main = partial(async_fn, *args) if async_fn else None + arbiter_addr = (host, port) = arbiter_addr or ( + _default_arbiter_host, _default_arbiter_port) + # make a temporary connection to see if an arbiter exists + arbiter_found = False + try: + async with _connect_chan(host, port): + arbiter_found = True + except OSError: + log.warn(f"No actor could be found @ {host}:{port}") + + if arbiter_found: # we were able to connect to an arbiter + log.info(f"Arbiter seems to exist @ {host}:{port}") + # create a local actor and start up its main routine/task + actor = Actor( + name or 'anonymous', + main=main, + **kwargs + ) + host, port = (_default_arbiter_host, 0) + else: + # start this local actor as the arbiter + actor = Arbiter(name or 'arbiter', main=main, **kwargs) + + await start_actor(actor, host, port, arbiter_addr=arbiter_addr) # Creates an internal nursery which shouldn't be cancelled even if - # the one opened below is (this is desirable because the arbitter should + # the one opened below is (this is desirable because the arbiter should # stay up until a re-election process has taken place - which is not # implemented yet FYI). - async with get_arbiter( - host=kwargs.pop('arbiter_host', '127.0.0.1'), - port=kwargs.pop('arbiter_port', 1617), - main=main, - **kwargs, - ): - if not current_actor().is_arbiter: - # create a local actor and start it up its main routine - actor = Actor( - name or 'anonymous', - main=main, # main coroutine to be invoked - **kwargs - ) - # this will block and yield control to the `trio` run loop - await serve_local_actor( - actor, accept_addr=kwargs.pop('accept_addr', (None, 0))) - log.info("Completed async main") - # TODO: when the local actor's main has completed we cancel? - # actor.cancel() - else: - # block waiting for the arbiter main task to complete - pass - - # unset module state - global _current_actor - _current_actor = None -def run(async_fn, *args, arbiter_host=None, name='anonymous', **kwargs): +def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ - return trio.run(_main, async_fn, args, kwargs, name) + return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)