forked from goodboy/tractor
				
			Add subscribe after close test
							parent
							
								
									aad6cf9070
								
							
						
					
					
						commit
						29e0b8f67d
					
				| 
						 | 
					@ -100,6 +100,29 @@ def test_stream_fan_out_to_local_subscriptions(
 | 
				
			||||||
    trio.run(main)
 | 
					    trio.run(main)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_subscribe_errors_after_close():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def main():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        size = 1
 | 
				
			||||||
 | 
					        tx, rx = trio.open_memory_channel(size)
 | 
				
			||||||
 | 
					        async with broadcast_receiver(rx, size) as brx:
 | 
				
			||||||
 | 
					            pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            # open and close
 | 
				
			||||||
 | 
					            async with brx.subscribe():
 | 
				
			||||||
 | 
					                pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        except trio.ClosedResourceError:
 | 
				
			||||||
 | 
					            assert brx.key not in brx._state.subs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            assert 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    trio.run(main)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_ensure_slow_consumers_lag_out(
 | 
					def test_ensure_slow_consumers_lag_out(
 | 
				
			||||||
    arb_addr,
 | 
					    arb_addr,
 | 
				
			||||||
    start_method,
 | 
					    start_method,
 | 
				
			||||||
| 
						 | 
					@ -129,7 +152,6 @@ def test_ensure_slow_consumers_lag_out(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                async with brx.subscribe() as lbrx:
 | 
					                async with brx.subscribe() as lbrx:
 | 
				
			||||||
                    while True:
 | 
					                    while True:
 | 
				
			||||||
                        #     await tractor.breakpoint()
 | 
					 | 
				
			||||||
                        print(f'{task.name}: starting consume loop')
 | 
					                        print(f'{task.name}: starting consume loop')
 | 
				
			||||||
                        try:
 | 
					                        try:
 | 
				
			||||||
                            async for value in lbrx:
 | 
					                            async for value in lbrx:
 | 
				
			||||||
| 
						 | 
					@ -156,7 +178,7 @@ def test_ensure_slow_consumers_lag_out(
 | 
				
			||||||
                            lag_time = time.time() - start
 | 
					                            lag_time = time.time() - start
 | 
				
			||||||
                            lags = laggers[task.name]
 | 
					                            lags = laggers[task.name]
 | 
				
			||||||
                            print(
 | 
					                            print(
 | 
				
			||||||
                                f'restarting slow ass {task.name} '
 | 
					                                f'restarting slow task {task.name} '
 | 
				
			||||||
                                f'that bailed out on {lags}:{value} '
 | 
					                                f'that bailed out on {lags}:{value} '
 | 
				
			||||||
                                f'after {lag_time:.3f}')
 | 
					                                f'after {lag_time:.3f}')
 | 
				
			||||||
                            if lags <= retries:
 | 
					                            if lags <= retries:
 | 
				
			||||||
| 
						 | 
					@ -207,7 +229,8 @@ def test_ensure_slow_consumers_lag_out(
 | 
				
			||||||
                    await brx.receive()
 | 
					                    await brx.receive()
 | 
				
			||||||
                except Lagged:
 | 
					                except Lagged:
 | 
				
			||||||
                    # expect tokio style index truncation
 | 
					                    # expect tokio style index truncation
 | 
				
			||||||
                    assert brx._state.subs[brx.key] == len(brx._state.queue) - 1
 | 
					                    seq = brx._state.subs[brx.key]
 | 
				
			||||||
 | 
					                    assert seq == len(brx._state.queue) - 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # all backpressured entries in the underlying
 | 
					                # all backpressured entries in the underlying
 | 
				
			||||||
                # channel should have been copied into the caster
 | 
					                # channel should have been copied into the caster
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue