More fixes to do cancellation correctly

Here is a bunch of code tightening to make sure cancellation works even
if recently spawned actors haven't fully started up and the parent is
cancelled.

The fixes include:
- passing the arbiter socket address to each actor
- ensure all spawned actors respect the spawner's log level
- handle process versus final `portal.result()` teardown in multiple
  tasks such that if a proc dies before shipping a result we don't wait
- more detailed debug logging in teardown code paths
- don't store peer connected events in the same `dict` as the peer channels
- if necessary fake main task results on peer channel disconnect
- warn when a `trio.Cancelled` is what causes a nursery to bail
  otherwise error
- store the subactor portal in the nursery for teardown purposes
- add dedicated `Portal.cancel_actor()` which acts as a "hard cancel"
  and never blocks (indefinitely)
- add `Arbiter.unregister_actor()` it's more explicit what's being
  requested
asyncgen_closing_fix
Tyler Goodlet 2018-07-10 15:06:42 -04:00
parent d94be22ef2
commit 49573c9a03
1 changed files with 190 additions and 115 deletions

View File

@ -1,5 +1,6 @@
""" """
tracor: An actor model micro-framework. tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
""" """
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
@ -26,6 +27,10 @@ _default_arbiter_port = 1616
_default_loglevel = None _default_loglevel = None
def get_loglevel():
return _default_loglevel
class ActorFailure(Exception): class ActorFailure(Exception):
"General actor failure" "General actor failure"
@ -82,7 +87,7 @@ async def _invoke(
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid}) await chan.send({'yield': item, 'cid': cid})
log.warn(f"Finished iterating {coro}") log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper # TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final # `StopAsyncIteration` system here for returning a final
# value if desired # value if desired
@ -101,11 +106,12 @@ async def _invoke(
except Exception: except Exception:
if not raise_errs: if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid}) await chan.send({'error': traceback.format_exc(), 'cid': cid})
log.exception("Actor errored:")
else: else:
raise raise
async def result_from_q(q): async def result_from_q(q, chan):
"""Process a msg from a remote actor. """Process a msg from a remote actor.
""" """
first_msg = await q.get() first_msg = await q.get()
@ -114,7 +120,7 @@ async def result_from_q(q):
elif 'yield' in first_msg: elif 'yield' in first_msg:
return 'yield', first_msg, q return 'yield', first_msg, q
elif 'error' in first_msg: elif 'error' in first_msg:
raise RemoteActorError(first_msg['error']) raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
else: else:
raise ValueError(f"{first_msg} is an invalid response packet?") raise ValueError(f"{first_msg} is an invalid response packet?")
@ -151,6 +157,7 @@ class Actor:
allow_rpc: bool = True, allow_rpc: bool = True,
outlive_main: bool = False, outlive_main: bool = False,
loglevel: str = None, loglevel: str = None,
arbiter_addr: (str, int) = None,
): ):
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
@ -163,9 +170,11 @@ class Actor:
self._allow_rpc = allow_rpc self._allow_rpc = allow_rpc
self._outlive_main = outlive_main self._outlive_main = outlive_main
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._peers = defaultdict(list) self._peers = defaultdict(list)
self._peer_connected = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
@ -178,7 +187,7 @@ class Actor:
``uid``. ``uid``.
""" """
log.debug(f"Waiting for peer {uid} to connect") log.debug(f"Waiting for peer {uid} to connect")
event = self._peers.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
log.debug(f"{uid} successfully connected back to us") log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
@ -210,23 +219,22 @@ class Actor:
return return
# channel tracking # channel tracking
event_or_chans = self._peers.pop(uid, None) event = self._peer_connected.pop(uid, None)
if isinstance(event_or_chans, trio.Event): if event:
# Instructing connection: this is likely a new channel to # Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via # a recently spawned actor which we'd like to control via
# async-rpc calls. # async-rpc calls.
log.debug(f"Waking channel waiters {event_or_chans.statistics()}") log.debug(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up # Alert any task waiting on this connection to come up
event_or_chans.set() event.set()
event_or_chans.clear() # consumer can wait on channel to close
elif isinstance(event_or_chans, list):
log.warn(
f"already have channel(s) for {uid}:{event_or_chans}?"
)
# append new channel
self._peers[uid].extend(event_or_chans)
chans = self._peers[uid]
if chans:
log.warn(
f"already have channel(s) for {uid}:{chans}?"
)
log.debug(f"Registered {chan} for {uid}") log.debug(f"Registered {chan} for {uid}")
# append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
@ -235,20 +243,31 @@ class Actor:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
if chan is not self._parent_chan: # if chan is not self._parent_chan:
log.debug(f"Releasing channel {chan}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if chan.connected(): if not chans:
log.debug(f"Disconnecting channel {chan}") log.debug(f"No more channels for {chan.uid}")
await chan.send(None) self._peers.pop(chan.uid, None)
await chan.aclose() if not self._actors2calls.get(chan.uid, {}).get('main'):
if not chans: # fake a "main task" result for any waiting
log.debug(f"No more channels for {chan.uid}") # nurseries/portals
self._peers.pop(chan.uid, None) log.debug(f"Faking result for {chan} from {chan.uid}")
if not self._peers: # no more channels connected q = self.get_waitq(chan.uid, 'main')
self._no_more_peers.set() q.put_nowait({'return': None, 'cid': 'main'})
log.debug(f"No more peer channels")
log.debug(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"Signalling no more peer channels")
# XXX: is this necessary?
if chan.connected():
log.debug(f"Disconnecting channel {chan}")
await chan.send(None)
await chan.aclose()
async def _push_result(self, actorid, cid, msg): async def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
@ -258,7 +277,7 @@ class Actor:
await q.put(msg) await q.put(msg)
def get_waitq(self, actorid, cid): def get_waitq(self, actorid, cid):
log.debug(f"Registering for callid {cid} queue results from {actorid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {}) cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000)) return cids2qs.setdefault(cid, trio.Queue(1000))
@ -289,7 +308,8 @@ class Actor:
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
log.debug( log.debug(
f"Terminating msg loop for {chan} from {chan.uid}") f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break break
log.debug(f"Received msg {msg} from {chan.uid}") log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
@ -340,7 +360,8 @@ class Actor:
def _fork_main(self, accept_addr, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
if self.loglevel: # log.warn("Log level after fork is {self.loglevel}")
if self.loglevel is not None:
get_console_log(self.loglevel) get_console_log(self.loglevel)
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
@ -357,7 +378,7 @@ class Actor:
async def _async_main( async def _async_main(
self, self,
accept_addr, accept_addr,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port), arbiter_addr=None,
parent_addr=None, parent_addr=None,
nursery=None nursery=None
): ):
@ -368,6 +389,8 @@ class Actor:
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
result = None result = None
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try: try:
async with maybe_open_nursery(nursery) as nursery: async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery self._root_nursery = nursery
@ -378,17 +401,30 @@ class Actor:
self._serve_forever, accept_host=host, accept_port=port) self._serve_forever, accept_host=host, accept_port=port)
) )
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None: if parent_addr is not None:
# Connect back to the parent actor and conduct initial try:
# handshake (From this point on if we error ship the # Connect back to the parent actor and conduct initial
# exception back to the parent actor) # handshake (From this point on if we error ship the
chan = self._parent_chan = Channel( # exception back to the parent actor)
destaddr=parent_addr, chan = self._parent_chan = Channel(
on_reconnect=self.main destaddr=parent_addr,
) on_reconnect=self.main
await chan.connect() )
# initial handshake, report who we are, who they are await chan.connect()
await _do_handshake(self, chan) # initial handshake, report who we are, who they are
await _do_handshake(self, chan)
except OSError: # failed to connect
log.warn(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
self.cancel_server()
self._parent_chan = None
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
@ -396,6 +432,7 @@ class Actor:
await arb_portal.run( await arb_portal.run(
'self', 'register_actor', 'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr) name=self.name, sockaddr=self.accept_addr)
registered_with_arbiter = True
# handle new connection back to parent optionally # handle new connection back to parent optionally
# begin responding to RPC # begin responding to RPC
@ -409,23 +446,26 @@ class Actor:
try: try:
if self._parent_chan: if self._parent_chan:
log.debug(f"Starting main task `{self.main}`") log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task # spawned subactor so deliver "main" task result(s)
# back to parent
await nursery.start( await nursery.start(
_invoke, 'main', _invoke, 'main',
self._parent_chan, self.main, {}, self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params False, True # treat_as_gen, raise_errs params
) )
else: else:
# run directly # run directly - we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly") log.debug(f"Running `{self.main}` directly")
result = await self.main() result = await self.main()
finally: finally:
# tear down channel server # tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main: if not self._outlive_main:
log.debug(f"Shutting down channel server") log.debug(f"Shutting down channel server")
self.cancel_server() self.cancel_server()
log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
# cancelled or signalled by the parent actor) # cancelled or signalled by the parent actor)
@ -438,30 +478,27 @@ class Actor:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed") f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:") log.exception("Actor errored:")
if not registered_with_arbiter:
log.exception(
f"Failed to register with arbiter @ {arbiter_addr}")
else: else:
raise raise
finally: finally:
# UNregister actor from the arbiter await self._do_unreg(arbiter_addr)
try: # terminate actor once all it's peers (actors that connected
if arbiter_addr is not None: # to it as clients) have disappeared
async with get_arbiter(*arbiter_addr) as arb_portal: if not self._no_more_peers.is_set():
await arb_portal.run( log.debug(
'self', 'unregister_actor', f"Waiting for remaining peers {self._peers} to clear")
name=self.name, sockaddr=self.accept_addr) await self._no_more_peers.wait()
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
# terminate local in-proc once its main completes
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete") log.debug(f"All peer channels are complete")
# tear down channel server # tear down channel server no matter what since we errored
if not self._outlive_main: # or completed
log.debug(f"Shutting down channel server") log.debug(f"Shutting down channel server")
self.cancel_server() self.cancel_server()
return result return result
@ -498,7 +535,17 @@ class Actor:
self._listeners.extend(listeners) self._listeners.extend(listeners)
task_status.started() task_status.started()
def cancel(self): async def _do_unreg(self, arbiter_addr):
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'unregister_actor', name=self.name)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully """This cancels the internal root-most nursery thereby gracefully
cancelling (for all intents and purposes) this actor. cancelling (for all intents and purposes) this actor.
""" """
@ -545,13 +592,8 @@ class Arbiter(Actor):
def register_actor(self, name, sockaddr): def register_actor(self, name, sockaddr):
self._registry[name].append(sockaddr) self._registry[name].append(sockaddr)
def unregister_actor(self, name, sockaddr): def unregister_actor(self, name):
sockaddrs = self._registry.get(name) self._registry.pop(name, None)
if sockaddrs:
try:
sockaddrs.remove(sockaddr)
except ValueError:
pass
class Portal: class Portal:
@ -584,7 +626,8 @@ class Portal:
# ship a function call request to the remote actor # ship a function call request to the remote actor
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle # wait on first response msg and handle
return await self._return_from_resptype(cid, *(await result_from_q(q))) return await self._return_from_resptype(
cid, *(await result_from_q(q, self.channel)))
async def _return_from_resptype(self, cid, resptype, first_msg, q): async def _return_from_resptype(self, cid, resptype, first_msg, q):
@ -618,7 +661,7 @@ class Portal:
""" """
if self._result is None: if self._result is None:
q = current_actor().get_waitq(self.channel.uid, 'main') q = current_actor().get_waitq(self.channel.uid, 'main')
resptype, first_msg, q = (await result_from_q(q)) resptype, first_msg, q = (await result_from_q(q, self.channel))
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
'main', resptype, first_msg, q) 'main', resptype, first_msg, q)
# await q.put(first_msg) # for next consumer (e.g. nursery) # await q.put(first_msg) # for next consumer (e.g. nursery)
@ -630,6 +673,21 @@ class Portal:
log.debug(f"Closing portal for {chan} to {chan.uid}") log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None) await self.channel.send(None)
async def cancel_actor(self):
"""Cancel the actor on the other end of this portal.
"""
log.warn(
f"Sending cancel request to {self.channel.uid} on "
f"{self.channel}")
try:
with trio.move_on_after(0.1) as cancel_scope:
cancel_scope.shield = True
# send cancel cmd - might not get response
await self.run('self', 'cancel')
except trio.ClosedStreamError:
log.warn(
f"{self.channel} for {self.channel.uid} was alreaday closed?")
@asynccontextmanager @asynccontextmanager
async def open_portal(channel, nursery=None): async def open_portal(channel, nursery=None):
@ -650,10 +708,6 @@ async def open_portal(channel, nursery=None):
if channel.uid is None: if channel.uid is None:
await _do_handshake(actor, channel) await _do_handshake(actor, channel)
if not actor.get_chans(channel.uid):
# actor is not currently managing this channel
actor._peers[channel.uid].append(channel)
nursery.start_soon(actor._process_messages, channel) nursery.start_soon(actor._process_messages, channel)
portal = Portal(channel) portal = Portal(channel)
yield portal yield portal
@ -665,7 +719,6 @@ async def open_portal(channel, nursery=None):
# cancel background msg loop task # cancel background msg loop task
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
if was_connected: if was_connected:
actor._peers[channel.uid].remove(channel)
await channel.aclose() await channel.aclose()
@ -707,8 +760,9 @@ class ActorNursery:
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes outlive_main=False, # sub-actors die when their main task completes
loglevel=_default_loglevel, # set log level per subactor loglevel=None, # set log level per subactor
): ):
loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor( actor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
@ -717,6 +771,7 @@ class ActorNursery:
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
outlive_main=outlive_main, outlive_main=outlive_main,
loglevel=loglevel, loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
) )
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
@ -735,26 +790,40 @@ class ActorNursery:
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid) event, chan = await self._actor.wait_for_peer(actor.uid)
# channel is up, get queue which delivers result from main routine portal = Portal(chan)
main_q = self._actor.get_waitq(actor.uid, 'main') self._children[(name, proc.pid)] = (actor, proc, portal)
self._children[(name, proc.pid)] = (actor, proc, main_q) return portal
return Portal(chan)
async def wait(self): async def wait(self):
async def wait_for_proc(proc): async def wait_for_proc(proc, actor, portal):
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel) await trio.hazmat.wait_readable(proc.sentinel)
# please god don't hang # please god don't hang
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
event = self._actor._peers.get(actor.uid)
if isinstance(event, trio.Event):
event.set()
log.warn(
f"Cancelled `wait_for_peer()` call since {actor.uid}"
f" is already dead!")
if not portal._result:
log.debug(f"Faking result for {actor.uid}")
q = self._actor.get_waitq(actor.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
async def wait_for_result(portal):
if portal.channel.connected():
log.debug(f"Waiting on final result from {subactor.uid}")
await portal.result()
# unblocks when all waiter tasks have completed # unblocks when all waiter tasks have completed
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values(): for subactor, proc, portal in self._children.values():
nursery.start_soon(wait_for_proc, proc) nursery.start_soon(wait_for_proc, proc, subactor, portal)
nursery.start_soon(wait_for_result, portal)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel
@ -764,7 +833,7 @@ class ActorNursery:
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
""" """
log.debug(f"Cancelling nursery") log.debug(f"Cancelling nursery")
for subactor, proc, main_q in self._children.values(): for subactor, proc, portal in self._children.values():
if proc is mp.current_process(): if proc is mp.current_process():
# XXX: does this even make sense? # XXX: does this even make sense?
await subactor.cancel() await subactor.cancel()
@ -776,15 +845,8 @@ class ActorNursery:
# send KeyBoardInterrupt (trio abort signal) to sub-actors # send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT) # os.kill(proc.pid, signal.SIGINT)
else: else:
# send cancel cmd - likely no response from subactor await portal.cancel_actor()
actor = self._actor
chans = actor.get_chans(subactor.uid)
if chans:
for chan in chans:
await actor.send_cmd(chan, 'self', 'cancel', {})
else:
log.warn(
f"Channel for {subactor.uid} is already down?")
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
log.debug(f"All subactors for {self} have terminated") log.debug(f"All subactors for {self} have terminated")
@ -792,10 +854,10 @@ class ActorNursery:
async def __aexit__(self, etype, value, tb): async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
async def wait_for_actor(actor, proc, q): async def wait_for_actor(actor, proc, portal):
if proc.is_alive(): if proc.is_alive():
ret_type, msg, q = await result_from_q(q) res = await portal.result()
log.info(f"{actor.uid} main task completed with {msg}") log.info(f"{actor.uid} main task completed with {res}")
if not actor._outlive_main: if not actor._outlive_main:
# trigger msg loop to break # trigger msg loop to break
chans = self._actor.get_chans(actor.uid) chans = self._actor.get_chans(actor.uid)
@ -804,14 +866,20 @@ class ActorNursery:
await chan.send(None) await chan.send(None)
if etype is not None: if etype is not None:
log.exception(f"{current_actor().uid} errored with {etype}, " if etype is trio.Cancelled:
"cancelling actor nursery") log.warn(f"{current_actor().uid} was cancelled with {etype}, "
await self.cancel() "cancelling actor nursery")
with trio.open_cancel_scope(shield=True):
await self.cancel()
else:
log.exception(f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else: else:
log.debug(f"Waiting on subactors {self._children} to complete") log.debug(f"Waiting on subactors {self._children} to complete")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values(): for subactor, proc, portal in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q) nursery.start_soon(wait_for_actor, subactor, proc, portal)
await self.wait() await self.wait()
log.debug(f"Nursery teardown complete") log.debug(f"Nursery teardown complete")
@ -824,7 +892,7 @@ def current_actor() -> Actor:
@asynccontextmanager @asynccontextmanager
async def open_nursery(supervisor=None, loglevel='WARNING'): async def open_nursery(supervisor=None):
"""Create and yield a new ``ActorNursery``. """Create and yield a new ``ActorNursery``.
""" """
actor = current_actor() actor = current_actor()
@ -908,7 +976,7 @@ async def get_arbiter(host, port):
@asynccontextmanager @asynccontextmanager
async def find_actor( async def find_actor(
name, name,
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) arbiter_sockaddr=None,
): ):
"""Ask the arbiter to find actor(s) by name. """Ask the arbiter to find actor(s) by name.
@ -919,7 +987,7 @@ async def find_actor(
if not actor: if not actor:
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name) sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
@ -938,7 +1006,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
main = partial(async_fn, *args) if async_fn else None main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or ( arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port) _default_arbiter_host, _default_arbiter_port)
get_console_log(kwargs.get('loglevel', _default_loglevel)) get_console_log(kwargs.get('loglevel', get_loglevel()))
# make a temporary connection to see if an arbiter exists # make a temporary connection to see if an arbiter exists
arbiter_found = False arbiter_found = False
@ -956,11 +1024,12 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
main=main, main=main,
**kwargs **kwargs
) )
host, port = (_default_arbiter_host, 0) host, port = (host, 0)
else: else:
# start this local actor as the arbiter # start this local actor as the arbiter
# this should eventually get passed `outlive_main=True`? # this should eventually get passed `outlive_main=True`?
actor = Arbiter(name or 'arbiter', main=main, **kwargs) actor = Arbiter(
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs)
# ``Actor._async_main()`` creates an internal nursery if one is not # ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes. # provided and thus blocks here until it's main task completes.
@ -970,7 +1039,13 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): def run(
async_fn,
*args,
name=None,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
**kwargs
):
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor. This is tractor's main entry and the start point for any async actor.