forked from goodboy/tractor
				
			Get tests working again
Remove all the `piker` stuff and add some further checks including: - main task result is returned correctly - remote errors are raised locally - remote async generator yields values locallyasyncgen_closing_fix
							parent
							
								
									36fd75e217
								
							
						
					
					
						commit
						10417303aa
					
				| 
						 | 
				
			
			@ -52,7 +52,7 @@ def test_local_actor_async_func():
 | 
			
		|||
# interal pickling infra of the forkserver to work
 | 
			
		||||
async def spawn(is_arbiter):
 | 
			
		||||
    statespace = {'doggy': 10, 'kitty': 4}
 | 
			
		||||
    namespaces = ['piker.brokers.core']
 | 
			
		||||
    namespaces = [__name__]
 | 
			
		||||
 | 
			
		||||
    await trio.sleep(0.1)
 | 
			
		||||
    actor = tractor.current_actor()
 | 
			
		||||
| 
						 | 
				
			
			@ -72,22 +72,33 @@ async def spawn(is_arbiter):
 | 
			
		|||
            )
 | 
			
		||||
            assert len(nursery._children) == 1
 | 
			
		||||
            assert portal.channel.uid in tractor.current_actor()._peers
 | 
			
		||||
            # be sure we can still get the result 
 | 
			
		||||
            result = await portal.result()
 | 
			
		||||
            assert result == 10
 | 
			
		||||
            return result
 | 
			
		||||
    else:
 | 
			
		||||
        return 10
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_local_arbiter_subactor_global_state():
 | 
			
		||||
    statespace = {'doggy': 10, 'kitty': 4}
 | 
			
		||||
    tractor.run(
 | 
			
		||||
    result = tractor.run(
 | 
			
		||||
        spawn,
 | 
			
		||||
        True,
 | 
			
		||||
        name='arbiter',
 | 
			
		||||
        statespace=statespace,
 | 
			
		||||
    )
 | 
			
		||||
    assert result == 10
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def rx_price_quotes_from_brokerd(us_symbols):
 | 
			
		||||
    """Verify we can spawn a daemon actor and retrieve streamed price data.
 | 
			
		||||
async def stream_seq(sequence):
 | 
			
		||||
    for i in sequence:
 | 
			
		||||
        yield i
 | 
			
		||||
        await trio.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_from_single_subactor():
 | 
			
		||||
    """Verify we can spawn a daemon actor and retrieve streamed data.
 | 
			
		||||
    """
 | 
			
		||||
    async with tractor.find_actor('brokerd') as portals:
 | 
			
		||||
        if not portals:
 | 
			
		||||
| 
						 | 
				
			
			@ -95,33 +106,25 @@ async def rx_price_quotes_from_brokerd(us_symbols):
 | 
			
		|||
            async with tractor.open_nursery() as nursery:
 | 
			
		||||
                # no brokerd actor found
 | 
			
		||||
                portal = await nursery.start_actor(
 | 
			
		||||
                    'brokerd',
 | 
			
		||||
                    rpc_module_paths=['piker.brokers.core'],
 | 
			
		||||
                    statespace={
 | 
			
		||||
                        'brokers2tickersubs': {},
 | 
			
		||||
                        'clients': {},
 | 
			
		||||
                        'dtasks': set()
 | 
			
		||||
                    },
 | 
			
		||||
                    main=None,  # don't start a main func - use rpc
 | 
			
		||||
                    'streamerd',
 | 
			
		||||
                    rpc_module_paths=[__name__],
 | 
			
		||||
                    statespace={'global_dict': {}},
 | 
			
		||||
                    # don't start a main func - use rpc
 | 
			
		||||
                    # currently the same as outlive_main=False
 | 
			
		||||
                    main=None,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # gotta expose in a broker agnostic way...
 | 
			
		||||
                # retrieve initial symbol data
 | 
			
		||||
                # sd = await portal.run(
 | 
			
		||||
                #     'piker.brokers.core', 'symbol_data', symbols=us_symbols)
 | 
			
		||||
                # assert list(sd.keys()) == us_symbols
 | 
			
		||||
                seq = range(10)
 | 
			
		||||
 | 
			
		||||
                gen = await portal.run(
 | 
			
		||||
                    'piker.brokers.core',
 | 
			
		||||
                    '_test_price_stream',
 | 
			
		||||
                    broker='robinhood',
 | 
			
		||||
                    symbols=us_symbols,
 | 
			
		||||
                    __name__,
 | 
			
		||||
                    'stream_seq',  # the func above
 | 
			
		||||
                    sequence=list(seq),  # has to be msgpack serializable
 | 
			
		||||
                )
 | 
			
		||||
                # it'd sure be nice to have an asyncitertools here...
 | 
			
		||||
                async for quotes in gen:
 | 
			
		||||
                    assert quotes
 | 
			
		||||
                    for key in quotes:
 | 
			
		||||
                        assert key in us_symbols
 | 
			
		||||
                iseq = iter(seq)
 | 
			
		||||
                async for val in gen:
 | 
			
		||||
                    assert val == next(iseq)
 | 
			
		||||
                    break
 | 
			
		||||
                    # terminate far-end async-gen
 | 
			
		||||
                    # await gen.asend(None)
 | 
			
		||||
| 
						 | 
				
			
			@ -130,14 +133,36 @@ async def rx_price_quotes_from_brokerd(us_symbols):
 | 
			
		|||
                # stop all spawned subactors
 | 
			
		||||
                await nursery.cancel()
 | 
			
		||||
 | 
			
		||||
    # arbitter is cancelled here due to `find_actors()` internals
 | 
			
		||||
    # (which internally uses `get_arbiter` which kills its channel
 | 
			
		||||
    # server scope on exit)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_rx_price_quotes_from_brokerd(us_symbols):
 | 
			
		||||
def test_stream_from_single_subactor(us_symbols):
 | 
			
		||||
    """Verify streaming from a spawned async generator.
 | 
			
		||||
    """
 | 
			
		||||
    tractor.run(
 | 
			
		||||
        rx_price_quotes_from_brokerd,
 | 
			
		||||
        us_symbols,
 | 
			
		||||
        name='arbiter',
 | 
			
		||||
        stream_from_single_subactor,
 | 
			
		||||
        name='client',
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def assert_err():
 | 
			
		||||
    assert 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_remote_error():
 | 
			
		||||
    """Verify an error raises in a subactor is propagated to the parent.
 | 
			
		||||
    """
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as nursery:
 | 
			
		||||
 | 
			
		||||
            portal = await nursery.start_actor('errorer', main=assert_err)
 | 
			
		||||
 | 
			
		||||
            # get result(s) from main task
 | 
			
		||||
            try:
 | 
			
		||||
                return await portal.result()
 | 
			
		||||
            except tractor.RemoteActorError:
 | 
			
		||||
                raise
 | 
			
		||||
            except Exception:
 | 
			
		||||
                pass
 | 
			
		||||
            assert 0, "Remote error was not raised?"
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(tractor.RemoteActorError):
 | 
			
		||||
        tractor.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue