From 2cc03965d8000edcd806170ea2e70c31bbd0acb3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jul 2018 21:32:42 -0400 Subject: [PATCH 01/16] Add trio plugin for testing --- requirements-test.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-test.txt b/requirements-test.txt index be31bc5..285c77f 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest +pytest-trio pdbpp From 36fd75e217a64b801ad473694e2164695c632e6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 6 Jul 2018 02:36:21 -0400 Subject: [PATCH 02/16] Fix some bugs to get tests working Fix quite a few little bugs: - async gen func detection in `_invoke()` - always cancel channel server on main task exit - wait for remaining channel peers after unsub from arbiter - return result from main task(s) all the way up to `tractor.run()` Also add a `Portal.result()` for getting the final result(s) from the actor's main task and fix up a bunch of docs. --- tractor/__init__.py | 121 ++++++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 49 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index cb7e121..6c0fde7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -58,7 +58,10 @@ async def _invoke( if isinstance(func, partial): is_async_partial = inspect.iscoroutinefunction(func.func) - if not inspect.iscoroutinefunction(func) and not is_async_partial: + if ( + not inspect.iscoroutinefunction(func) and not is_async_partial + and not inspect.isasyncgenfunction(func) + ): await chan.send({'return': func(**kwargs), 'cid': cid}) else: coro = func(**kwargs) @@ -276,9 +279,6 @@ class Actor: cid = msg.get('cid') if cid: # deliver response to local caller/waiter self._push_result(chan.uid, cid, msg) - if 'error' in msg: - # TODO: need something better then this slop - raise RemoteActorError(msg['error']) log.debug(f"Waiting on next msg for {chan}") continue else: @@ -343,7 +343,8 @@ class Actor: parent_addr=None, nursery=None ): - """Start the channel server and main task. + """Start the channel server, maybe connect back to the parent, and + start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. @@ -387,28 +388,24 @@ class Actor: name=self.name, sockaddr=self.accept_addr) if self.main: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task - await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - log.debug(f"Running `{self.main}` directly") - result = await self.main() + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # start "main" routine in a task + await nursery.start( + _invoke, 'main', self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly + log.debug(f"Running `{self.main}` directly") + result = await self.main() - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() - log.debug(f"All peer channels are complete") - - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + finally: + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -431,6 +428,12 @@ class Actor: except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") + # terminate local in-proc once its main completes + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + return result async def _serve_forever( @@ -441,9 +444,10 @@ class Actor: accept_port=0, task_status=trio.TASK_STATUS_IGNORED ): - """Main coroutine: connect back to the parent, spawn main task, begin - listening for new messages. + """Start the channel server, begin listening for new connections. + This will cause an actor to continue living (blocking) until + ``cancel_server()`` is called. """ async with trio.open_nursery() as nursery: self._server_nursery = nursery @@ -454,6 +458,8 @@ class Actor: partial( trio.serve_tcp, self._stream_handler, + # new connections will stay alive even if this server + # is cancelled handler_nursery=self._root_nursery, port=accept_port, host=accept_host, ) @@ -543,13 +549,13 @@ class Portal: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - chan = self.channel - # ship a function call request to the remote actor actor = current_actor() + # ship a function call request to the remote actor + cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) + # wait on first response msg and handle + return await self._return_from_resptype(cid, *(await result_from_q(q))) - cid, q = await actor.send_cmd(chan, ns, func, kwargs) - # wait on first response msg - resptype, first_msg, q = await result_from_q(q) + async def _return_from_resptype(self, cid, resptype, first_msg, q): if resptype == 'yield': @@ -562,7 +568,9 @@ class Portal: except KeyError: raise RemoteActorError(msg['error']) except GeneratorExit: - log.debug(f"Cancelling async gen call {cid} to {chan.uid}") + log.debug( + f"Cancelling async gen call {cid} to " + "{self.channel.uid}") return yield_from_q() @@ -571,12 +579,22 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") + async def result(self): + """Return the result(s) from the remote actor's "main" task. + """ + q = current_actor().get_waitq(self.channel.uid, 'main') + first_msg = (await result_from_q(q)) + val = await self._return_from_resptype( + 'main', *first_msg) + await q.put(first_msg) # for next consumer (e.g. nursery) + return val + @asynccontextmanager async def open_portal(channel, nursery=None): """Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle rpc message processing. + Spawns a background task to handle message processing. """ actor = current_actor() assert actor @@ -643,12 +661,12 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set console logging per subactor + loglevel=None, # set log level per subactor ): actor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, + rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked outlive_main=outlive_main, @@ -783,7 +801,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): # NOTE: this won't block since we provide the nursery log.info(f"Starting local {actor} @ {host}:{port}") - await actor._async_main( + result = await actor._async_main( accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, @@ -799,11 +817,14 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): _current_actor = None log.info("Completed async main") + return result + @asynccontextmanager async def _connect_chan(host, port): """Attempt to connect to an arbiter's channel server. - Return the channel on success or None on failure. + + Return the channel on success or ``None`` on failure. """ chan = Channel((host, port)) await chan.connect() @@ -837,8 +858,8 @@ async def find_actor( ): """Ask the arbiter to find actor(s) by name. - Returns a sequence of unconnected portals for each matching actor - known to the arbiter (client code is expected to connect the portals). + Returns a connected portal to the last registered matching actor + known to the arbiter. """ actor = current_actor() if not actor: @@ -847,14 +868,14 @@ async def find_actor( async with get_arbiter(*arbiter_sockaddr) as arb_portal: sockaddrs = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just - # the first one we find + # the last one that registered if sockaddrs: sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: - yield + yield None async def _main(async_fn, args, kwargs, name, arbiter_addr): @@ -871,9 +892,9 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): except OSError: log.warn(f"No actor could be found @ {host}:{port}") + # create a local actor and start up its main routine/task if arbiter_found: # we were able to connect to an arbiter log.info(f"Arbiter seems to exist @ {host}:{port}") - # create a local actor and start up its main routine/task actor = Actor( name or 'anonymous', main=main, @@ -882,13 +903,15 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): host, port = (_default_arbiter_host, 0) else: # start this local actor as the arbiter + # this should eventually get passed `outlive_main=True`? actor = Arbiter(name or 'arbiter', main=main, **kwargs) - await start_actor(actor, host, port, arbiter_addr=arbiter_addr) - # Creates an internal nursery which shouldn't be cancelled even if - # the one opened below is (this is desirable because the arbiter should - # stay up until a re-election process has taken place - which is not - # implemented yet FYI). + # ``Actor._async_main()`` creates an internal nursery if one is not + # provided and thus blocks here until it's main task completes. + # Note that if the current actor is the arbiter it is desirable + # for it to stay up indefinitely until a re-election process has + # taken place - which is not implemented yet FYI). + return await start_actor(actor, host, port, arbiter_addr=arbiter_addr) def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): From 10417303aa37a632a469981e41b679c1b69639c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 6 Jul 2018 02:45:26 -0400 Subject: [PATCH 03/16] Get tests working again Remove all the `piker` stuff and add some further checks including: - main task result is returned correctly - remote errors are raised locally - remote async generator yields values locally --- tests/test_tractor.py | 91 +++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 410144e..dcb49d5 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -52,7 +52,7 @@ def test_local_actor_async_func(): # interal pickling infra of the forkserver to work async def spawn(is_arbiter): statespace = {'doggy': 10, 'kitty': 4} - namespaces = ['piker.brokers.core'] + namespaces = [__name__] await trio.sleep(0.1) actor = tractor.current_actor() @@ -72,22 +72,33 @@ async def spawn(is_arbiter): ) assert len(nursery._children) == 1 assert portal.channel.uid in tractor.current_actor()._peers + # be sure we can still get the result + result = await portal.result() + assert result == 10 + return result else: return 10 def test_local_arbiter_subactor_global_state(): statespace = {'doggy': 10, 'kitty': 4} - tractor.run( + result = tractor.run( spawn, True, name='arbiter', statespace=statespace, ) + assert result == 10 -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. +async def stream_seq(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +async def stream_from_single_subactor(): + """Verify we can spawn a daemon actor and retrieve streamed data. """ async with tractor.find_actor('brokerd') as portals: if not portals: @@ -95,33 +106,25 @@ async def rx_price_quotes_from_brokerd(us_symbols): async with tractor.open_nursery() as nursery: # no brokerd actor found portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.core'], - statespace={ - 'brokers2tickersubs': {}, - 'clients': {}, - 'dtasks': set() - }, - main=None, # don't start a main func - use rpc + 'streamerd', + rpc_module_paths=[__name__], + statespace={'global_dict': {}}, + # don't start a main func - use rpc + # currently the same as outlive_main=False + main=None, ) - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.core', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols + seq = range(10) gen = await portal.run( - 'piker.brokers.core', - '_test_price_stream', - broker='robinhood', - symbols=us_symbols, + __name__, + 'stream_seq', # the func above + sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols + iseq = iter(seq) + async for val in gen: + assert val == next(iseq) break # terminate far-end async-gen # await gen.asend(None) @@ -130,14 +133,36 @@ async def rx_price_quotes_from_brokerd(us_symbols): # stop all spawned subactors await nursery.cancel() - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) - -def test_rx_price_quotes_from_brokerd(us_symbols): +def test_stream_from_single_subactor(us_symbols): + """Verify streaming from a spawned async generator. + """ tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', + stream_from_single_subactor, + name='client', ) + + +async def assert_err(): + assert 0 + + +def test_remote_error(): + """Verify an error raises in a subactor is propagated to the parent. + """ + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor('errorer', main=assert_err) + + # get result(s) from main task + try: + return await portal.result() + except tractor.RemoteActorError: + raise + except Exception: + pass + assert 0, "Remote error was not raised?" + + with pytest.raises(tractor.RemoteActorError): + tractor.run(main) From 77e34049b8658ef12728bbeaacb2166bacce3a93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 7 Jul 2018 16:50:59 -0400 Subject: [PATCH 04/16] More fixes after unit testing - Allow passing in a program-wide `loglevel` - Add detailed debug logging particularly to do with channel msg processing and connection handling - Don't daemonize subprocesses for now as it prevents use of sub-sub-actors (need to solve #6 first) - Add a `Portal.close()` which just tells the remote actor to tear down the channel (for now) - Add a message to signal the remote `StopAsyncIteration` from an async gen such that the client side terminates properly as well - Make `Actor.cancel()` cancel the channel server first - Actors *must* complete the arbiter registeration steps before moving on with their main taks and rpc handling - When delivering rpc responses (using the local per caller queue) use the blocking interface (`trio.Queue.put()`) to get backpressure - Properly detect an `partial` wrapped async generators in `_invoke` --- tractor/__init__.py | 160 ++++++++++++++++++++++++++++++-------------- 1 file changed, 108 insertions(+), 52 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 6c0fde7..8da16f0 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -23,6 +23,7 @@ log = get_logger('tractor') _current_actor = None _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 +_default_loglevel = None class ActorFailure(Exception): @@ -55,12 +56,16 @@ async def _invoke( """ try: is_async_partial = False + is_async_gen_partial = False if isinstance(func, partial): is_async_partial = inspect.iscoroutinefunction(func.func) + is_async_gen_partial = inspect.isasyncgenfunction(func.func) if ( - not inspect.iscoroutinefunction(func) and not is_async_partial - and not inspect.isasyncgenfunction(func) + not inspect.iscoroutinefunction(func) and + not inspect.isasyncgenfunction(func) and + not is_async_partial and + not is_async_gen_partial ): await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -76,6 +81,12 @@ async def _invoke( # if to_send is not None: # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) + + log.warn(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': None, 'cid': cid}) else: if treat_as_gen: # XXX: the async-func may spawn further tasks which push @@ -139,6 +150,7 @@ class Actor: uid: str = None, allow_rpc: bool = True, outlive_main: bool = False, + loglevel: str = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -150,6 +162,7 @@ class Actor: self.statespace = statespace self._allow_rpc = allow_rpc self._outlive_main = outlive_main + self.loglevel = loglevel # filled in by `_async_main` after fork self._peers = defaultdict(list) @@ -226,6 +239,10 @@ class Actor: log.debug(f"Releasing channel {chan}") chans = self._peers.get(chan.uid) chans.remove(chan) + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -233,15 +250,12 @@ class Actor: self._no_more_peers.set() log.debug(f"No more peer channels") - def _push_result(self, actorid, cid, msg): + async def _push_result(self, actorid, cid, msg): assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - waiters = q.statistics().tasks_waiting_get - if not waiters: - log.warn( - f"No tasks are currently waiting for results from call {cid}?") - q.put_nowait(msg) + # maintain backpressure + await q.put(msg) def get_waitq(self, actorid, cid): log.debug(f"Registering for callid {cid} queue results from {actorid}") @@ -266,20 +280,23 @@ class Actor: """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! - log.debug(f"Entering msg loop for {chan}") + log.debug(f"Entering msg loop for {chan} from {chan.uid}") async with trio.open_nursery() as nursery: try: async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel - log.debug(f"Cancelling all tasks for {chan}") + log.debug( + f"Cancelling all tasks for {chan} from {chan.uid}") nursery.cancel_scope.cancel() - log.debug(f"Terminating msg loop for {chan}") + log.debug( + f"Terminating msg loop for {chan} from {chan.uid}") break - log.debug(f"Received msg {msg}") + log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: # deliver response to local caller/waiter - self._push_result(chan.uid, cid, msg) - log.debug(f"Waiting on next msg for {chan}") + await self._push_result(chan.uid, cid, msg) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") continue else: ns, funcname, kwargs, actorid, cid = msg['cmd'] @@ -312,22 +329,23 @@ class Actor: _invoke, cid, chan, func, kwargs, treat_as_gen, name=funcname ) - log.debug(f"Waiting on next msg for {chan}") + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} disconnected") + log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedStreamError: - log.error(f"{chan} broke") + log.error(f"{chan} form {chan.uid} broke") - log.debug(f"Exiting msg loop for {chan}") + log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): + def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` + if self.loglevel: + get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for actor {self.uid}") global _current_actor _current_actor = self - if loglevel: - get_console_log(loglevel) log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -372,6 +390,13 @@ class Actor: # initial handshake, report who we are, who they are await _do_handshake(self, chan) + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + # handle new connection back to parent optionally # begin responding to RPC if self._allow_rpc: @@ -380,20 +405,14 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) - # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - if self.main: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") # start "main" routine in a task await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, + _invoke, 'main', + self._parent_chan, self.main, {}, False, True # treat_as_gen, raise_errs params ) else: @@ -412,9 +431,14 @@ class Actor: # cancelled or signalled by the parent actor) except Exception: if self._parent_chan: + try: + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + except trio.ClosedStreamError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) else: raise finally: @@ -423,7 +447,7 @@ class Actor: if arbiter_addr is not None: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( - 'self', 'register_actor', + 'self', 'unregister_actor', name=self.name, sockaddr=self.accept_addr) except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") @@ -434,6 +458,11 @@ class Actor: await self._no_more_peers.wait() log.debug(f"All peer channels are complete") + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + return result async def _serve_forever( @@ -473,6 +502,7 @@ class Actor: """This cancels the internal root-most nursery thereby gracefully cancelling (for all intents and purposes) this actor. """ + self.cancel_server() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -535,6 +565,7 @@ class Portal: """ def __init__(self, channel): self.channel = channel + self._result = None async def aclose(self): log.debug(f"Closing {self}") @@ -566,11 +597,14 @@ class Portal: try: yield msg['yield'] except KeyError: - raise RemoteActorError(msg['error']) + if 'stop' in msg: + break # far end async gen terminated + else: + raise RemoteActorError(msg['error']) except GeneratorExit: log.debug( f"Cancelling async gen call {cid} to " - "{self.channel.uid}") + f"{self.channel.uid}") return yield_from_q() @@ -582,12 +616,19 @@ class Portal: async def result(self): """Return the result(s) from the remote actor's "main" task. """ - q = current_actor().get_waitq(self.channel.uid, 'main') - first_msg = (await result_from_q(q)) - val = await self._return_from_resptype( - 'main', *first_msg) - await q.put(first_msg) # for next consumer (e.g. nursery) - return val + if self._result is None: + q = current_actor().get_waitq(self.channel.uid, 'main') + resptype, first_msg, q = (await result_from_q(q)) + self._result = await self._return_from_resptype( + 'main', resptype, first_msg, q) + # await q.put(first_msg) # for next consumer (e.g. nursery) + return self._result + + async def close(self): + # trigger remote msg loop `break` + chan = self.channel + log.debug(f"Closing portal for {chan} to {chan.uid}") + await self.channel.send(None) @asynccontextmanager @@ -614,7 +655,12 @@ async def open_portal(channel, nursery=None): actor._peers[channel.uid].append(channel) nursery.start_soon(actor._process_messages, channel) - yield Portal(channel) + portal = Portal(channel) + yield portal + + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() # cancel background msg loop task nursery.cancel_scope.cancel() @@ -644,7 +690,7 @@ class ActorNursery: """Spawn scoped subprocess actors. """ def __init__(self, actor, supervisor=None): - self.supervisor = supervisor + self.supervisor = supervisor # TODO self._actor = actor # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope @@ -661,7 +707,7 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set log level per subactor + loglevel=_default_loglevel, # set log level per subactor ): actor = Actor( name, @@ -670,19 +716,21 @@ class ActorNursery: statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked outlive_main=outlive_main, + loglevel=loglevel, ) parent_addr = self._actor.accept_addr assert parent_addr proc = ctx.Process( target=actor._fork_main, - args=(bind_addr, parent_addr, loglevel), - daemon=True, + args=(bind_addr, parent_addr), + # daemon=True, name=name, ) proc.start() if not proc.is_alive(): raise ActorFailure("Couldn't start sub-actor?") + log.info(f"Started {proc}") # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it @@ -709,6 +757,12 @@ class ActorNursery: nursery.start_soon(wait_for_proc, proc) async def cancel(self, hard_kill=False): + """Cancel this nursery by instructing each subactor to cancel + iteslf and wait for all subprocesses to terminate. + + If ``hard_killl`` is set to ``True`` then kill the processes + directly without any far end graceful ``trio`` cancellation. + """ log.debug(f"Cancelling nursery") for subactor, proc, main_q in self._children.values(): if proc is mp.current_process(): @@ -718,8 +772,8 @@ class ActorNursery: if hard_kill: log.warn(f"Hard killing subactors {self._children}") proc.terminate() - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors + # XXX: doesn't seem to work? + # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) else: # send cancel cmd - likely no response from subactor @@ -750,11 +804,11 @@ class ActorNursery: await chan.send(None) if etype is not None: - log.warn(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") + log.exception(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") await self.cancel() else: - log.debug(f"Waiting on subactors to complete") + log.debug(f"Waiting on subactors {self._children} to complete") async with trio.open_nursery() as nursery: for subactor, proc, main_q in self._children.values(): nursery.start_soon(wait_for_actor, subactor, proc, main_q) @@ -786,7 +840,7 @@ class NoArbiterFound(Exception): "Couldn't find the arbiter?" -async def start_actor(actor, host, port, arbiter_addr, nursery=None): +async def _start_actor(actor, host, port, arbiter_addr, nursery=None): """Spawn a local actor by starting a task to execute it's main async function. @@ -884,6 +938,8 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) + get_console_log(kwargs.get('loglevel', _default_loglevel)) + # make a temporary connection to see if an arbiter exists arbiter_found = False try: @@ -911,7 +967,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): # Note that if the current actor is the arbiter it is desirable # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). - return await start_actor(actor, host, port, arbiter_addr=arbiter_addr) + return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): From d94be22ef26eb9cc74c50dee1fce94b9caa86a90 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 7 Jul 2018 22:22:05 -0400 Subject: [PATCH 05/16] Add a "show me the code" test from the readme --- tests/test_tractor.py | 143 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 128 insertions(+), 15 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index dcb49d5..653170b 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -9,11 +9,6 @@ import trio import tractor -@pytest.fixture -def us_symbols(): - return ['TSLA', 'AAPL', 'CGC', 'CRON'] - - @pytest.mark.trio async def test_no_arbitter(): """An arbitter must be established before any nurseries @@ -72,7 +67,7 @@ async def spawn(is_arbiter): ) assert len(nursery._children) == 1 assert portal.channel.uid in tractor.current_actor()._peers - # be sure we can still get the result + # be sure we can still get the result result = await portal.result() assert result == 10 return result @@ -116,31 +111,31 @@ async def stream_from_single_subactor(): seq = range(10) - gen = await portal.run( + agen = await portal.run( __name__, 'stream_seq', # the func above sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) - async for val in gen: + async for val in agen: assert val == next(iseq) - break + # TODO: test breaking the loop (should it kill the + # far end?) + # break # terminate far-end async-gen # await gen.asend(None) # break # stop all spawned subactors - await nursery.cancel() + await portal.cancel_actor() + # await nursery.cancel() -def test_stream_from_single_subactor(us_symbols): +def test_stream_from_single_subactor(): """Verify streaming from a spawned async generator. """ - tractor.run( - stream_from_single_subactor, - name='client', - ) + tractor.run(stream_from_single_subactor) async def assert_err(): @@ -159,10 +154,128 @@ def test_remote_error(): try: return await portal.result() except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehh") raise except Exception: pass assert 0, "Remote error was not raised?" with pytest.raises(tractor.RemoteActorError): + # also raises tractor.run(main) + + +def do_nothing(): + pass + + +def test_cancel_single_subactor(): + + async def main(): + + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor( + 'nothin', rpc_module_paths=[__name__], + ) + assert not await portal.run(__name__, 'do_nothing') + + # would hang otherwise + await nursery.cancel() + + tractor.run(main) + + +async def stream_data(seed): + for i in range(seed): + yield i + # await trio.sleep(1/10000.) # trigger scheduler + await trio.sleep(0) # trigger scheduler + + +async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + outlive_main=True, # daemonize these actors + ) + + portals.append(portal) + + q = trio.Queue(int(1e3)) + + async def push_to_q(portal): + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + await q.put(value) + + await q.put(None) + print(f"FINISHED ITERATING {portal.channel.uid}") + + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + for portal in portals: + n.start_soon(push_to_q, portal) + + unique_vals = set() + async for value in q: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value + continue + + assert value in unique_vals + if value is None: + break + + print("FINISHED ITERATING in aggregator") + + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") + + +async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: + + seed = int(10) + import time + pre_start = time.time() + portal = await nursery.start_actor( + name='aggregator', + # executed in the actor's "main task" immediately + main=partial(aggregate, seed), + ) + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "main" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) + + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + [None] + + +def test_show_me_the_code(): + """Verify the *show me the code* readme example works. + """ + tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + + +def test_cancel_smtc(): + """Verify we can cancel midway through the smtc example gracefully. + """ + pass From 49573c9a03920c825ef8e637c20a383f5cc22ade Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Jul 2018 15:06:42 -0400 Subject: [PATCH 06/16] More fixes to do cancellation correctly Here is a bunch of code tightening to make sure cancellation works even if recently spawned actors haven't fully started up and the parent is cancelled. The fixes include: - passing the arbiter socket address to each actor - ensure all spawned actors respect the spawner's log level - handle process versus final `portal.result()` teardown in multiple tasks such that if a proc dies before shipping a result we don't wait - more detailed debug logging in teardown code paths - don't store peer connected events in the same `dict` as the peer channels - if necessary fake main task results on peer channel disconnect - warn when a `trio.Cancelled` is what causes a nursery to bail otherwise error - store the subactor portal in the nursery for teardown purposes - add dedicated `Portal.cancel_actor()` which acts as a "hard cancel" and never blocks (indefinitely) - add `Arbiter.unregister_actor()` it's more explicit what's being requested --- tractor/__init__.py | 305 +++++++++++++++++++++++++++----------------- 1 file changed, 190 insertions(+), 115 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 8da16f0..22fe303 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -1,5 +1,6 @@ """ -tracor: An actor model micro-framework. +tractor: An actor model micro-framework built on + ``trio`` and ``multiprocessing``. """ from collections import defaultdict from functools import partial @@ -26,6 +27,10 @@ _default_arbiter_port = 1616 _default_loglevel = None +def get_loglevel(): + return _default_loglevel + + class ActorFailure(Exception): "General actor failure" @@ -82,7 +87,7 @@ async def _invoke( # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) - log.warn(f"Finished iterating {coro}") + log.debug(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired @@ -101,11 +106,12 @@ async def _invoke( except Exception: if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid}) + log.exception("Actor errored:") else: raise -async def result_from_q(q): +async def result_from_q(q, chan): """Process a msg from a remote actor. """ first_msg = await q.get() @@ -114,7 +120,7 @@ async def result_from_q(q): elif 'yield' in first_msg: return 'yield', first_msg, q elif 'error' in first_msg: - raise RemoteActorError(first_msg['error']) + raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -151,6 +157,7 @@ class Actor: allow_rpc: bool = True, outlive_main: bool = False, loglevel: str = None, + arbiter_addr: (str, int) = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -163,9 +170,11 @@ class Actor: self._allow_rpc = allow_rpc self._outlive_main = outlive_main self.loglevel = loglevel + self._arb_addr = arbiter_addr # filled in by `_async_main` after fork self._peers = defaultdict(list) + self._peer_connected = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} @@ -178,7 +187,7 @@ class Actor: ``uid``. """ log.debug(f"Waiting for peer {uid} to connect") - event = self._peers.setdefault(uid, trio.Event()) + event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] @@ -210,23 +219,22 @@ class Actor: return # channel tracking - event_or_chans = self._peers.pop(uid, None) - if isinstance(event_or_chans, trio.Event): + event = self._peer_connected.pop(uid, None) + if event: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. - log.debug(f"Waking channel waiters {event_or_chans.statistics()}") + log.debug(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up - event_or_chans.set() - event_or_chans.clear() # consumer can wait on channel to close - elif isinstance(event_or_chans, list): - log.warn( - f"already have channel(s) for {uid}:{event_or_chans}?" - ) - # append new channel - self._peers[uid].extend(event_or_chans) + event.set() + chans = self._peers[uid] + if chans: + log.warn( + f"already have channel(s) for {uid}:{chans}?" + ) log.debug(f"Registered {chan} for {uid}") + # append new channel self._peers[uid].append(chan) # Begin channel management - respond to remote requests and @@ -235,20 +243,31 @@ class Actor: await self._process_messages(chan) finally: # Drop ref to channel so it can be gc-ed and disconnected - if chan is not self._parent_chan: - log.debug(f"Releasing channel {chan}") - chans = self._peers.get(chan.uid) - chans.remove(chan) - if chan.connected(): - log.debug(f"Disconnecting channel {chan}") - await chan.send(None) - await chan.aclose() - if not chans: - log.debug(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) - if not self._peers: # no more channels connected - self._no_more_peers.set() - log.debug(f"No more peer channels") + # if chan is not self._parent_chan: + log.debug(f"Releasing channel {chan} from {chan.uid}") + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) + if not self._actors2calls.get(chan.uid, {}).get('main'): + # fake a "main task" result for any waiting + # nurseries/portals + log.debug(f"Faking result for {chan} from {chan.uid}") + q = self.get_waitq(chan.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + log.debug(f"Peers is {self._peers}") + + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"Signalling no more peer channels") + + # XXX: is this necessary? + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() async def _push_result(self, actorid, cid, msg): assert actorid, f"`actorid` can't be {actorid}" @@ -258,7 +277,7 @@ class Actor: await q.put(msg) def get_waitq(self, actorid, cid): - log.debug(f"Registering for callid {cid} queue results from {actorid}") + log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) @@ -289,7 +308,8 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") nursery.cancel_scope.cancel() log.debug( - f"Terminating msg loop for {chan} from {chan.uid}") + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") break log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') @@ -340,7 +360,8 @@ class Actor: def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` - if self.loglevel: + # log.warn("Log level after fork is {self.loglevel}") + if self.loglevel is not None: get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for actor {self.uid}") @@ -357,7 +378,7 @@ class Actor: async def _async_main( self, accept_addr, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + arbiter_addr=None, parent_addr=None, nursery=None ): @@ -368,6 +389,8 @@ class Actor: and when cancelled effectively cancels the actor. """ result = None + arbiter_addr = arbiter_addr or self._arb_addr + registered_with_arbiter = False try: async with maybe_open_nursery(nursery) as nursery: self._root_nursery = nursery @@ -378,17 +401,30 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) + # XXX: I wonder if a better name is maybe "requester" + # since I don't think the notion of a "parent" actor + # necessarily sticks given that eventually we want + # ``'MainProcess'`` (the actor who initially starts the + # forkserver) to eventually be the only one who is + # allowed to spawn new processes per Python program. if parent_addr is not None: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + try: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() + # initial handshake, report who we are, who they are + await _do_handshake(self, chan) + except OSError: # failed to connect + log.warn( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + self.cancel_server() + self._parent_chan = None # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -396,6 +432,7 @@ class Actor: await arb_portal.run( 'self', 'register_actor', name=self.name, sockaddr=self.accept_addr) + registered_with_arbiter = True # handle new connection back to parent optionally # begin responding to RPC @@ -409,23 +446,26 @@ class Actor: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task + # spawned subactor so deliver "main" task result(s) + # back to parent await nursery.start( _invoke, 'main', self._parent_chan, self.main, {}, False, True # treat_as_gen, raise_errs params ) else: - # run directly + # run directly - we are an "unspawned actor" log.debug(f"Running `{self.main}` directly") result = await self.main() finally: - # tear down channel server + # tear down channel server in order to ensure + # we exit normally when the main task is done if not self._outlive_main: log.debug(f"Shutting down channel server") self.cancel_server() + log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is # cancelled or signalled by the parent actor) @@ -438,30 +478,27 @@ class Actor: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") + log.exception("Actor errored:") + + if not registered_with_arbiter: + log.exception( + f"Failed to register with arbiter @ {arbiter_addr}") else: raise finally: - # UNregister actor from the arbiter - try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', - name=self.name, sockaddr=self.accept_addr) - except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") - - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + await self._do_unreg(arbiter_addr) + # terminate actor once all it's peers (actors that connected + # to it as clients) have disappeared + if not self._no_more_peers.is_set(): + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() log.debug(f"All peer channels are complete") - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + # tear down channel server no matter what since we errored + # or completed + log.debug(f"Shutting down channel server") + self.cancel_server() return result @@ -498,7 +535,17 @@ class Actor: self._listeners.extend(listeners) task_status.started() - def cancel(self): + async def _do_unreg(self, arbiter_addr): + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', name=self.name) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") + + async def cancel(self): """This cancels the internal root-most nursery thereby gracefully cancelling (for all intents and purposes) this actor. """ @@ -545,13 +592,8 @@ class Arbiter(Actor): def register_actor(self, name, sockaddr): self._registry[name].append(sockaddr) - def unregister_actor(self, name, sockaddr): - sockaddrs = self._registry.get(name) - if sockaddrs: - try: - sockaddrs.remove(sockaddr) - except ValueError: - pass + def unregister_actor(self, name): + self._registry.pop(name, None) class Portal: @@ -584,7 +626,8 @@ class Portal: # ship a function call request to the remote actor cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) # wait on first response msg and handle - return await self._return_from_resptype(cid, *(await result_from_q(q))) + return await self._return_from_resptype( + cid, *(await result_from_q(q, self.channel))) async def _return_from_resptype(self, cid, resptype, first_msg, q): @@ -618,7 +661,7 @@ class Portal: """ if self._result is None: q = current_actor().get_waitq(self.channel.uid, 'main') - resptype, first_msg, q = (await result_from_q(q)) + resptype, first_msg, q = (await result_from_q(q, self.channel)) self._result = await self._return_from_resptype( 'main', resptype, first_msg, q) # await q.put(first_msg) # for next consumer (e.g. nursery) @@ -630,6 +673,21 @@ class Portal: log.debug(f"Closing portal for {chan} to {chan.uid}") await self.channel.send(None) + async def cancel_actor(self): + """Cancel the actor on the other end of this portal. + """ + log.warn( + f"Sending cancel request to {self.channel.uid} on " + f"{self.channel}") + try: + with trio.move_on_after(0.1) as cancel_scope: + cancel_scope.shield = True + # send cancel cmd - might not get response + await self.run('self', 'cancel') + except trio.ClosedStreamError: + log.warn( + f"{self.channel} for {self.channel.uid} was alreaday closed?") + @asynccontextmanager async def open_portal(channel, nursery=None): @@ -650,10 +708,6 @@ async def open_portal(channel, nursery=None): if channel.uid is None: await _do_handshake(actor, channel) - if not actor.get_chans(channel.uid): - # actor is not currently managing this channel - actor._peers[channel.uid].append(channel) - nursery.start_soon(actor._process_messages, channel) portal = Portal(channel) yield portal @@ -665,7 +719,6 @@ async def open_portal(channel, nursery=None): # cancel background msg loop task nursery.cancel_scope.cancel() if was_connected: - actor._peers[channel.uid].remove(channel) await channel.aclose() @@ -707,8 +760,9 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=_default_loglevel, # set log level per subactor + loglevel=None, # set log level per subactor ): + loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, # modules allowed to invoked funcs from @@ -717,6 +771,7 @@ class ActorNursery: main=main, # main coroutine to be invoked outlive_main=outlive_main, loglevel=loglevel, + arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr assert parent_addr @@ -735,26 +790,40 @@ class ActorNursery: # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) - # channel is up, get queue which delivers result from main routine - main_q = self._actor.get_waitq(actor.uid, 'main') - self._children[(name, proc.pid)] = (actor, proc, main_q) - - return Portal(chan) + portal = Portal(chan) + self._children[(name, proc.pid)] = (actor, proc, portal) + return portal async def wait(self): - async def wait_for_proc(proc): + async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) # please god don't hang proc.join() log.debug(f"Joined {proc}") + event = self._actor._peers.get(actor.uid) + if isinstance(event, trio.Event): + event.set() + log.warn( + f"Cancelled `wait_for_peer()` call since {actor.uid}" + f" is already dead!") + if not portal._result: + log.debug(f"Faking result for {actor.uid}") + q = self._actor.get_waitq(actor.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + async def wait_for_result(portal): + if portal.channel.connected(): + log.debug(f"Waiting on final result from {subactor.uid}") + await portal.result() # unblocks when all waiter tasks have completed async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_proc, proc) + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_proc, proc, subactor, portal) + nursery.start_soon(wait_for_result, portal) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel @@ -764,7 +833,7 @@ class ActorNursery: directly without any far end graceful ``trio`` cancellation. """ log.debug(f"Cancelling nursery") - for subactor, proc, main_q in self._children.values(): + for subactor, proc, portal in self._children.values(): if proc is mp.current_process(): # XXX: does this even make sense? await subactor.cancel() @@ -776,15 +845,8 @@ class ActorNursery: # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) else: - # send cancel cmd - likely no response from subactor - actor = self._actor - chans = actor.get_chans(subactor.uid) - if chans: - for chan in chans: - await actor.send_cmd(chan, 'self', 'cancel', {}) - else: - log.warn( - f"Channel for {subactor.uid} is already down?") + await portal.cancel_actor() + log.debug(f"Waiting on all subactors to complete") await self.wait() log.debug(f"All subactors for {self} have terminated") @@ -792,10 +854,10 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - async def wait_for_actor(actor, proc, q): + async def wait_for_actor(actor, proc, portal): if proc.is_alive(): - ret_type, msg, q = await result_from_q(q) - log.info(f"{actor.uid} main task completed with {msg}") + res = await portal.result() + log.info(f"{actor.uid} main task completed with {res}") if not actor._outlive_main: # trigger msg loop to break chans = self._actor.get_chans(actor.uid) @@ -804,14 +866,20 @@ class ActorNursery: await chan.send(None) if etype is not None: - log.exception(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() + if etype is trio.Cancelled: + log.warn(f"{current_actor().uid} was cancelled with {etype}, " + "cancelling actor nursery") + with trio.open_cancel_scope(shield=True): + await self.cancel() + else: + log.exception(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() else: log.debug(f"Waiting on subactors {self._children} to complete") async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, main_q) + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_actor, subactor, proc, portal) await self.wait() log.debug(f"Nursery teardown complete") @@ -824,7 +892,7 @@ def current_actor() -> Actor: @asynccontextmanager -async def open_nursery(supervisor=None, loglevel='WARNING'): +async def open_nursery(supervisor=None): """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -908,7 +976,7 @@ async def get_arbiter(host, port): @asynccontextmanager async def find_actor( name, - arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) + arbiter_sockaddr=None, ): """Ask the arbiter to find actor(s) by name. @@ -919,7 +987,7 @@ async def find_actor( if not actor: raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr) as arb_portal: + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: sockaddrs = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered @@ -938,7 +1006,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) - get_console_log(kwargs.get('loglevel', _default_loglevel)) + get_console_log(kwargs.get('loglevel', get_loglevel())) # make a temporary connection to see if an arbiter exists arbiter_found = False @@ -956,11 +1024,12 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main=main, **kwargs ) - host, port = (_default_arbiter_host, 0) + host, port = (host, 0) else: # start this local actor as the arbiter # this should eventually get passed `outlive_main=True`? - actor = Arbiter(name or 'arbiter', main=main, **kwargs) + actor = Arbiter( + name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs) # ``Actor._async_main()`` creates an internal nursery if one is not # provided and thus blocks here until it's main task completes. @@ -970,7 +1039,13 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) -def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): +def run( + async_fn, + *args, + name=None, + arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + **kwargs +): """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. From 1854471992a07d311f09ea6ff7ac256489af8c1f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Jul 2018 17:19:54 -0400 Subject: [PATCH 07/16] Add tests which verify the readme is correct - steal from `trio` and add a `tractor_test` decorator - use a random arbiter port to avoid conflicts with locally running systems - add all the (obviously) hilarious readme tests - add a complex cancellation test which works with `trio.move_on_after()` --- tests/test_tractor.py | 150 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 133 insertions(+), 17 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 653170b..8492664 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,13 +2,34 @@ Actor model API testing """ import time -from functools import partial +from functools import partial, wraps +import random import pytest import trio import tractor +_arb_addr = '127.0.0.1', random.randint(1000, 9999) + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + """ + @wraps(fn) + def wrapper(*args, **kwargs): + __tracebackhide__ = True + return tractor.run( + partial(fn, *args), arbiter_addr=_arb_addr, **kwargs) + + return wrapper + + @pytest.mark.trio async def test_no_arbitter(): """An arbitter must be established before any nurseries @@ -36,7 +57,7 @@ def test_local_actor_async_func(): await trio.sleep(0.1) start = time.time() - tractor.run(print_loop) + tractor.run(print_loop, arbiter_addr=_arb_addr) # ensure the sleeps were actually awaited assert time.time() - start >= 1 @@ -82,6 +103,7 @@ def test_local_arbiter_subactor_global_state(): True, name='arbiter', statespace=statespace, + arbiter_addr=_arb_addr, ) assert result == 10 @@ -135,7 +157,7 @@ async def stream_from_single_subactor(): def test_stream_from_single_subactor(): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor) + tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) async def assert_err(): @@ -162,7 +184,92 @@ def test_remote_error(): with pytest.raises(tractor.RemoteActorError): # also raises - tractor.run(main) + tractor.run(main, arbiter_addr=_arb_addr) + + +the_line = 'Hi my name is {}' + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + await trio.sleep(0.4) # wait for other actor to spawn + async with tractor.find_actor(other_actor) as portal: + return await portal.run(__name__, 'hi') + + +@tractor_test +async def test_trynamic_trio(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.start_actor( + 'donny', + main=partial(say_hello, 'gretchen'), + rpc_module_paths=[__name__], + outlive_main=True + ) + gretchen = await n.start_actor( + 'gretchen', + main=partial(say_hello, 'donny'), + rpc_module_paths=[__name__], + # outlive_main=True + ) + print(await gretchen.result()) + print(await donny.result()) + await donny.cancel_actor() + print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") + + +def movie_theatre_question(): + """A question asked in a dark theatre, in a tangent + (errr, I mean different) process. + """ + return 'have you ever seen a portal?' + + +@tractor_test +async def test_movie_theatre_convo(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'frank', + # enable the actor to run funcs from this current module + rpc_module_paths=[__name__], + outlive_main=True, + ) + + print(await portal.run(__name__, 'movie_theatre_question')) + # calls the subactor a 2nd time + print(await portal.run(__name__, 'movie_theatre_question')) + + # the async with will block here indefinitely waiting + # for our actor "frank" to complete, but since it's an + # "outlive_main" actor it will never until cancelled + await portal.cancel_actor() + + +def cellar_door(): + return "Dang that's beautiful" + + +@tractor_test +async def test_most_beautiful_word(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor('some_linguist', main=cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) def do_nothing(): @@ -178,18 +285,17 @@ def test_cancel_single_subactor(): portal = await nursery.start_actor( 'nothin', rpc_module_paths=[__name__], ) - assert not await portal.run(__name__, 'do_nothing') + assert (await portal.run(__name__, 'do_nothing')) is None # would hang otherwise await nursery.cancel() - tractor.run(main) + tractor.run(main, arbiter_addr=_arb_addr) async def stream_data(seed): for i in range(seed): yield i - # await trio.sleep(1/10000.) # trigger scheduler await trio.sleep(0) # trigger scheduler @@ -209,7 +315,7 @@ async def aggregate(seed): portals.append(portal) - q = trio.Queue(int(1e3)) + q = trio.Queue(500) async def push_to_q(portal): async for value in await portal.run( @@ -231,11 +337,11 @@ async def aggregate(seed): unique_vals.add(value) # yield upwards to the spawning parent actor yield value - continue + + if value is None: + break assert value in unique_vals - if value is None: - break print("FINISHED ITERATING in aggregator") @@ -244,13 +350,15 @@ async def aggregate(seed): print("AGGREGATOR COMPLETE!") -async def main(): +# @tractor_test +async def a_quadruple_example(): # a nursery which spawns "actors" async with tractor.open_nursery() as nursery: seed = int(10) import time pre_start = time.time() + portal = await nursery.start_actor( name='aggregator', # executed in the actor's "main task" immediately @@ -269,13 +377,21 @@ async def main(): assert result_stream == list(range(seed)) + [None] -def test_show_me_the_code(): +async def cancel_after(wait): + with trio.move_on_after(wait): + await a_quadruple_example() + + +def test_a_quadruple_example(): """Verify the *show me the code* readme example works. """ - tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + tractor.run(cancel_after, 2, arbiter_addr=_arb_addr) -def test_cancel_smtc(): - """Verify we can cancel midway through the smtc example gracefully. +def test_not_fast_enough_quad(): + """Verify we can cancel midway through the quad example and all actors + cancel gracefully. + + This also serves as a kind of "we'd like to eventually be this fast test". """ - pass + tractor.run(cancel_after, 1, arbiter_addr=_arb_addr) From 209a6a209680d7e3c2aa64b4f3de3938c6b0f8fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 00:20:50 -0400 Subject: [PATCH 08/16] Add a separate cancel scope for the main task Cancellation requires that each actor cancel it's spawned subactors before cancelling its own root (nursery's) cancel scope to avoid breaking channel connections before kill commands (`Actor.cancel()`) have been sent off to peers. To solve this, ensure each main task is cancelled to completion first (which will guarantee that all actor nurseries have completed their cancellation steps) before cancelling the actor's "core" tasks under the "root" scope. --- tractor/__init__.py | 60 ++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 22fe303..2342a0a 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -176,6 +176,8 @@ class Actor: self._peers = defaultdict(list) self._peer_connected = {} self._no_more_peers = trio.Event() + self._main_complete = trio.Event() + self._main_scope = None self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] @@ -443,28 +445,32 @@ class Actor: self._process_messages, self._parent_chan) if self.main: - try: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" task result(s) - # back to parent - await nursery.start( - _invoke, 'main', - self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - we are an "unspawned actor" - log.debug(f"Running `{self.main}` directly") - result = await self.main() - - finally: - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # spawned subactor so deliver "main" task result(s) + # back to parent + await nursery.start( + _invoke, 'main', + self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly - we are an "unspawned actor" + log.debug(f"Running `{self.main}` directly") + result = await self.main() + finally: + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + if main_scope.cancelled_caught: + log.debug("Main task was cancelled sucessfully") + self._main_complete.set() log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -550,6 +556,10 @@ class Actor: cancelling (for all intents and purposes) this actor. """ self.cancel_server() + if self._main_scope: + self._main_scope.cancel() + log.debug("Waiting on main task to complete") + await self._main_complete.wait() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -684,9 +694,11 @@ class Portal: cancel_scope.shield = True # send cancel cmd - might not get response await self.run('self', 'cancel') + return True except trio.ClosedStreamError: log.warn( - f"{self.channel} for {self.channel.uid} was alreaday closed?") + f"{self.channel} for {self.channel.uid} was already closed?") + return False @asynccontextmanager @@ -795,7 +807,8 @@ class ActorNursery: return portal async def wait(self): - + """Wait for all subactors to complete. + """ async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): @@ -1022,6 +1035,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): actor = Actor( name or 'anonymous', main=main, + arbiter_addr=arbiter_addr, **kwargs ) host, port = (host, 0) From bb293905b99de52e3c07969c1dae35ca9d7690a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 00:32:03 -0400 Subject: [PATCH 09/16] Verify expected non-result under cancellation --- tests/test_tractor.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 8492664..6375ca8 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -255,6 +255,17 @@ async def test_movie_theatre_convo(): await portal.cancel_actor() +@tractor_test +async def test_movie_theatre_convo_main_task(): + async with tractor.open_nursery() as n: + portal = await n.start_actor('some_linguist', main=cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + def cellar_door(): return "Dang that's beautiful" @@ -355,7 +366,7 @@ async def a_quadruple_example(): # a nursery which spawns "actors" async with tractor.open_nursery() as nursery: - seed = int(10) + seed = int(1e3) import time pre_start = time.time() @@ -375,17 +386,19 @@ async def a_quadruple_example(): print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") assert result_stream == list(range(seed)) + [None] + return result_stream async def cancel_after(wait): with trio.move_on_after(wait): - await a_quadruple_example() + return await a_quadruple_example() def test_a_quadruple_example(): """Verify the *show me the code* readme example works. """ - tractor.run(cancel_after, 2, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, 2, arbiter_addr=_arb_addr) + assert results def test_not_fast_enough_quad(): @@ -394,4 +407,5 @@ def test_not_fast_enough_quad(): This also serves as a kind of "we'd like to eventually be this fast test". """ - tractor.run(cancel_after, 1, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, 1, arbiter_addr=_arb_addr) + assert results is None From bb9309bdf5a8cfd36e2f70c80137ba0c1bf6bc54 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 16:56:22 -0400 Subject: [PATCH 10/16] Add a cancellation strategy test --- tests/test_tractor.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 6375ca8..6cb6a10 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -187,6 +187,35 @@ def test_remote_error(): tractor.run(main, arbiter_addr=_arb_addr) +@tractor_test +async def test_one_cancels_all(): + """Verify one failed actor causes all others in the nursery + to be cancelled just like in trio. + + This is the first and only supervisory strategy at the moment. + """ + try: + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + outlive_main=True + )) + + # start one actor that will fail immediately + await n.start_actor('extra', main=assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError` + + except tractor.RemoteActorError: + assert n.cancelled is True + else: + pytest.fail("Should have gotten a remote assertion error?") + + the_line = 'Hi my name is {}' @@ -251,7 +280,7 @@ async def test_movie_theatre_convo(): # the async with will block here indefinitely waiting # for our actor "frank" to complete, but since it's an - # "outlive_main" actor it will never until cancelled + # "outlive_main" actor it will never end until cancelled await portal.cancel_actor() @@ -367,7 +396,6 @@ async def a_quadruple_example(): async with tractor.open_nursery() as nursery: seed = int(1e3) - import time pre_start = time.time() portal = await nursery.start_actor( From 25852794a81ea23b5220432167c3282fa366e908 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 18:08:57 -0400 Subject: [PATCH 11/16] Move chan connect helper to ipc mod --- tractor/ipc.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tractor/ipc.py b/tractor/ipc.py index 7bc647c..56c09db 100644 --- a/tractor/ipc.py +++ b/tractor/ipc.py @@ -5,6 +5,7 @@ from typing import Coroutine, Tuple import msgpack import trio +from async_generator import asynccontextmanager from .log import get_logger log = get_logger('ipc') @@ -189,3 +190,14 @@ class Channel: def connected(self): return self.squeue.connected() if self.squeue else False + + +@asynccontextmanager +async def _connect_chan(host, port): + """Create and connect a channel with disconnect on + context manager teardown. + """ + chan = Channel((host, port)) + await chan.connect() + yield chan + await chan.aclose() From 1ade5c5fbb17843c67563a2e95a44b74e419a7c9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 18:09:38 -0400 Subject: [PATCH 12/16] Add onc-cancels-all strategy to actor nursery --- tractor/__init__.py | 58 +++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 2342a0a..9658b80 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -14,7 +14,7 @@ import uuid import trio from async_generator import asynccontextmanager -from .ipc import Channel +from .ipc import Channel, _connect_chan from .log import get_console_log, get_logger ctx = mp.get_context("forkserver") @@ -245,7 +245,6 @@ class Actor: await self._process_messages(chan) finally: # Drop ref to channel so it can be gc-ed and disconnected - # if chan is not self._parent_chan: log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) @@ -450,27 +449,29 @@ class Actor: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" task result(s) - # back to parent + # spawned subactor so deliver "main" + # task result(s) back to parent await nursery.start( _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params + # treat_as_gen, raise_errs params + False, True ) else: # run directly - we are an "unspawned actor" log.debug(f"Running `{self.main}` directly") result = await self.main() - finally: + self._main_complete.set() # tear down channel server in order to ensure # we exit normally when the main task is done if not self._outlive_main: log.debug(f"Shutting down channel server") self.cancel_server() + log.debug(f"Shutting down root nursery") + nursery.cancel_scope.cancel() if main_scope.cancelled_caught: log.debug("Main task was cancelled sucessfully") - self._main_complete.set() log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -674,6 +675,9 @@ class Portal: resptype, first_msg, q = (await result_from_q(q, self.channel)) self._result = await self._return_from_resptype( 'main', resptype, first_msg, q) + log.warn( + f"Retrieved first result `{self._result}` " + f"for {self.channel.uid}") # await q.put(first_msg) # for next consumer (e.g. nursery) return self._result @@ -760,6 +764,7 @@ class ActorNursery: # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope self._children = {} + self.cancelled = False async def __aenter__(self): return self @@ -867,17 +872,6 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - async def wait_for_actor(actor, proc, portal): - if proc.is_alive(): - res = await portal.result() - log.info(f"{actor.uid} main task completed with {res}") - if not actor._outlive_main: - # trigger msg loop to break - chans = self._actor.get_chans(actor.uid) - for chan in chans: - log.info(f"Signalling msg loop exit for {actor.uid}") - await chan.send(None) - if etype is not None: if etype is trio.Cancelled: log.warn(f"{current_actor().uid} was cancelled with {etype}, " @@ -889,13 +883,17 @@ class ActorNursery: "cancelling actor nursery") await self.cancel() else: + # XXX: this is effectively the lone cancellation/supervisor + # strategy which exactly mimicks trio's behaviour log.debug(f"Waiting on subactors {self._children} to complete") - async with trio.open_nursery() as nursery: - for subactor, proc, portal in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, portal) - - await self.wait() - log.debug(f"Nursery teardown complete") + try: + await self.wait() + except Exception as err: + log.warn(f"Nursery caught {err}, cancelling") + await self.cancel() + self.cancelled = True + raise + log.debug(f"Nursery teardown complete") def current_actor() -> Actor: @@ -955,18 +953,6 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None): return result -@asynccontextmanager -async def _connect_chan(host, port): - """Attempt to connect to an arbiter's channel server. - - Return the channel on success or ``None`` on failure. - """ - chan = Channel((host, port)) - await chan.connect() - yield chan - await chan.aclose() - - @asynccontextmanager async def get_arbiter(host, port): """Return a portal instance connected to a local or remote From d9aa6119e1676f47bed911e4c09b26ae1a5619d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 19:24:08 -0400 Subject: [PATCH 13/16] Set cancelled state in cancel method --- tests/test_tractor.py | 2 +- tractor/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 6cb6a10..246cdf2 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -247,7 +247,6 @@ async def test_trynamic_trio(): 'gretchen', main=partial(say_hello, 'donny'), rpc_module_paths=[__name__], - # outlive_main=True ) print(await gretchen.result()) print(await donny.result()) @@ -361,6 +360,7 @@ async def aggregate(seed): async for value in await portal.run( __name__, 'stream_data', seed=seed ): + # leverage trio's built-in backpressure await q.put(value) await q.put(None) diff --git a/tractor/__init__.py b/tractor/__init__.py index 9658b80..938a404 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -867,6 +867,7 @@ class ActorNursery: log.debug(f"Waiting on all subactors to complete") await self.wait() + self.cancelled = True log.debug(f"All subactors for {self} have terminated") async def __aexit__(self, etype, value, tb): @@ -891,7 +892,6 @@ class ActorNursery: except Exception as err: log.warn(f"Nursery caught {err}, cancelling") await self.cancel() - self.cancelled = True raise log.debug(f"Nursery teardown complete") From 590267ded2e8d305fcf7850e11a8fc7e5b93a779 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 19:24:37 -0400 Subject: [PATCH 14/16] Add a simpler cancel test --- tests/test_tractor.py | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 246cdf2..0a416ef 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -3,6 +3,7 @@ Actor model API testing """ import time from functools import partial, wraps +from itertools import repeat import random import pytest @@ -64,10 +65,12 @@ def test_local_actor_async_func(): assert nums == list(range(10)) +statespace = {'doggy': 10, 'kitty': 4} + + # NOTE: this func must be defined at module level in order for the # interal pickling infra of the forkserver to work async def spawn(is_arbiter): - statespace = {'doggy': 10, 'kitty': 4} namespaces = [__name__] await trio.sleep(0.1) @@ -97,7 +100,6 @@ async def spawn(is_arbiter): def test_local_arbiter_subactor_global_state(): - statespace = {'doggy': 10, 'kitty': 4} result = tractor.run( spawn, True, @@ -187,6 +189,30 @@ def test_remote_error(): tractor.run(main, arbiter_addr=_arb_addr) +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + yield i + await trio.sleep(0.01) + + +@tractor_test +async def test_cancel_infinite_streamer(): + + # stream for at most 5 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + outlive_main=True + ) + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + assert cancel_scope.cancelled_caught + assert n.cancelled + + @tractor_test async def test_one_cancels_all(): """Verify one failed actor causes all others in the nursery From a26d6f831f08969976ccd5428f1575a85c704aa2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 19:25:30 -0400 Subject: [PATCH 15/16] Add loglevel setting to test suite --- tests/conftest.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e7b63da --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,18 @@ +""" +``tractor`` testing!! +""" +import pytest +import tractor + + +def pytest_addoption(parser): + parser.addoption("--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing") + + +@pytest.fixture(scope='session', autouse=True) +def loglevel(request): + orig = tractor._default_loglevel + level = tractor._default_loglevel = request.config.option.loglevel + yield level + tractor._default_loglevel = orig From 1b41b7b6b76e6f42c3698163509f6a196147d74b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 22:28:05 -0400 Subject: [PATCH 16/16] Add initial travis file --- .travis.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e569571 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: python +python: + - '3.6' + # setup.py reading README breaks this? + # - pypy + # - nightly + +install: + - cd $TRAVIS_BUILD_DIR + - pip install . -r requirements-test.txt + +script: + - pytest tests/