forked from goodboy/tractor
				
			Drop run and rpc_module_paths from streaming tests
							parent
							
								
									9e64161538
								
							
						
					
					
						commit
						247483ee93
					
				| 
						 | 
				
			
			@ -50,14 +50,24 @@ async def context_stream(ctx, sequence):
 | 
			
		|||
    assert cs.cancelled_caught
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_from_single_subactor(stream_func):
 | 
			
		||||
async def stream_from_single_subactor(
 | 
			
		||||
    arb_addr,
 | 
			
		||||
    start_method,
 | 
			
		||||
    stream_func,
 | 
			
		||||
):
 | 
			
		||||
    """Verify we can spawn a daemon actor and retrieve streamed data.
 | 
			
		||||
    """
 | 
			
		||||
    async with tractor.find_actor('streamerd') as portals:
 | 
			
		||||
    # only one per host address, spawns an actor if None
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery(
 | 
			
		||||
        arbiter_addr=arb_addr,
 | 
			
		||||
        start_method=start_method,
 | 
			
		||||
    ) as nursery:
 | 
			
		||||
 | 
			
		||||
        async with tractor.find_actor('streamerd') as portals:
 | 
			
		||||
 | 
			
		||||
            if not portals:
 | 
			
		||||
 | 
			
		||||
        if not portals:
 | 
			
		||||
            # only one per host address, spawns an actor if None
 | 
			
		||||
            async with tractor.open_nursery() as nursery:
 | 
			
		||||
                # no brokerd actor found
 | 
			
		||||
                portal = await nursery.start_actor(
 | 
			
		||||
                    'streamerd',
 | 
			
		||||
| 
						 | 
				
			
			@ -101,13 +111,13 @@ async def stream_from_single_subactor(stream_func):
 | 
			
		|||
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
 | 
			
		||||
    """Verify streaming from a spawned async generator.
 | 
			
		||||
    """
 | 
			
		||||
    tractor.run(
 | 
			
		||||
    trio.run(
 | 
			
		||||
        partial(
 | 
			
		||||
            stream_from_single_subactor,
 | 
			
		||||
            arb_addr,
 | 
			
		||||
            start_method,
 | 
			
		||||
            stream_func=stream_func,
 | 
			
		||||
        ),
 | 
			
		||||
        arbiter_addr=arb_addr,
 | 
			
		||||
        start_method=start_method,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -208,9 +218,10 @@ async def a_quadruple_example():
 | 
			
		|||
        return result_stream
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def cancel_after(wait):
 | 
			
		||||
    with trio.move_on_after(wait):
 | 
			
		||||
        return await a_quadruple_example()
 | 
			
		||||
async def cancel_after(wait, arb_addr):
 | 
			
		||||
    async with tractor.open_root_actor(arbiter_addr=arb_addr):
 | 
			
		||||
        with trio.move_on_after(wait):
 | 
			
		||||
            return await a_quadruple_example()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='module')
 | 
			
		||||
| 
						 | 
				
			
			@ -222,7 +233,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
 | 
			
		|||
 | 
			
		||||
    timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
 | 
			
		||||
    start = time.time()
 | 
			
		||||
    results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr)
 | 
			
		||||
    results = trio.run(cancel_after, timeout, arb_addr)
 | 
			
		||||
    diff = time.time() - start
 | 
			
		||||
    assert results
 | 
			
		||||
    return results, diff
 | 
			
		||||
| 
						 | 
				
			
			@ -249,7 +260,7 @@ def test_not_fast_enough_quad(
 | 
			
		|||
    """
 | 
			
		||||
    results, diff = time_quad_ex
 | 
			
		||||
    delay = max(diff - cancel_delay, 0)
 | 
			
		||||
    results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
 | 
			
		||||
    results = trio.run(cancel_after, delay, arb_addr)
 | 
			
		||||
    system = platform.system()
 | 
			
		||||
    if system in ('Windows', 'Darwin') and results is not None:
 | 
			
		||||
        # In CI envoirments it seems later runs are quicker then the first
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue