diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 410144e..dcb49d5 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -52,7 +52,7 @@ def test_local_actor_async_func(): # interal pickling infra of the forkserver to work async def spawn(is_arbiter): statespace = {'doggy': 10, 'kitty': 4} - namespaces = ['piker.brokers.core'] + namespaces = [__name__] await trio.sleep(0.1) actor = tractor.current_actor() @@ -72,22 +72,33 @@ async def spawn(is_arbiter): ) assert len(nursery._children) == 1 assert portal.channel.uid in tractor.current_actor()._peers + # be sure we can still get the result + result = await portal.result() + assert result == 10 + return result else: return 10 def test_local_arbiter_subactor_global_state(): statespace = {'doggy': 10, 'kitty': 4} - tractor.run( + result = tractor.run( spawn, True, name='arbiter', statespace=statespace, ) + assert result == 10 -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. +async def stream_seq(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +async def stream_from_single_subactor(): + """Verify we can spawn a daemon actor and retrieve streamed data. """ async with tractor.find_actor('brokerd') as portals: if not portals: @@ -95,33 +106,25 @@ async def rx_price_quotes_from_brokerd(us_symbols): async with tractor.open_nursery() as nursery: # no brokerd actor found portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.core'], - statespace={ - 'brokers2tickersubs': {}, - 'clients': {}, - 'dtasks': set() - }, - main=None, # don't start a main func - use rpc + 'streamerd', + rpc_module_paths=[__name__], + statespace={'global_dict': {}}, + # don't start a main func - use rpc + # currently the same as outlive_main=False + main=None, ) - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.core', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols + seq = range(10) gen = await portal.run( - 'piker.brokers.core', - '_test_price_stream', - broker='robinhood', - symbols=us_symbols, + __name__, + 'stream_seq', # the func above + sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols + iseq = iter(seq) + async for val in gen: + assert val == next(iseq) break # terminate far-end async-gen # await gen.asend(None) @@ -130,14 +133,36 @@ async def rx_price_quotes_from_brokerd(us_symbols): # stop all spawned subactors await nursery.cancel() - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) - -def test_rx_price_quotes_from_brokerd(us_symbols): +def test_stream_from_single_subactor(us_symbols): + """Verify streaming from a spawned async generator. + """ tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', + stream_from_single_subactor, + name='client', ) + + +async def assert_err(): + assert 0 + + +def test_remote_error(): + """Verify an error raises in a subactor is propagated to the parent. + """ + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor('errorer', main=assert_err) + + # get result(s) from main task + try: + return await portal.result() + except tractor.RemoteActorError: + raise + except Exception: + pass + assert 0, "Remote error was not raised?" + + with pytest.raises(tractor.RemoteActorError): + tractor.run(main)