Add timeouts around some context test bodies
Since with my in-index runtime-port to our native msg-spec it seems these ones are hanging B( - `test_one_end_stream_not_opened()` - `test_maybe_allow_overruns_stream()` Tossing in some `trio.fail_after()`s seems to at least gnab them as failures B)multihost_exs
							parent
							
								
									b5bdd20eb5
								
							
						
					
					
						commit
						afabef166e
					
				|  | @ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from itertools import count | from itertools import count | ||||||
|  | import math | ||||||
| import platform | import platform | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| from typing import ( | from typing import ( | ||||||
|  | @ -845,7 +846,10 @@ async def keep_sending_from_callee( | ||||||
|         ('caller', 1, never_open_stream), |         ('caller', 1, never_open_stream), | ||||||
|         ('callee', 0, keep_sending_from_callee), |         ('callee', 0, keep_sending_from_callee), | ||||||
|     ], |     ], | ||||||
|     ids='overrun_condition={}'.format, |     ids=[ | ||||||
|  |          ('caller_1buf_never_open_stream'), | ||||||
|  |          ('callee_0buf_keep_sending_from_callee'), | ||||||
|  |     ] | ||||||
| ) | ) | ||||||
| def test_one_end_stream_not_opened( | def test_one_end_stream_not_opened( | ||||||
|     overrun_by: tuple[str, int, Callable], |     overrun_by: tuple[str, int, Callable], | ||||||
|  | @ -869,29 +873,30 @@ def test_one_end_stream_not_opened( | ||||||
|                 enable_modules=[__name__], |                 enable_modules=[__name__], | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             async with portal.open_context( |             with trio.fail_after(0.8): | ||||||
|                 entrypoint, |                 async with portal.open_context( | ||||||
|             ) as (ctx, sent): |                     entrypoint, | ||||||
|                 assert sent is None |                 ) as (ctx, sent): | ||||||
|  |                     assert sent is None | ||||||
| 
 | 
 | ||||||
|                 if 'caller' in overrunner: |                     if 'caller' in overrunner: | ||||||
| 
 | 
 | ||||||
|                     async with ctx.open_stream() as stream: |                         async with ctx.open_stream() as stream: | ||||||
| 
 | 
 | ||||||
|                         # itersend +1 msg more then the buffer size |                             # itersend +1 msg more then the buffer size | ||||||
|                         # to cause the most basic overrun. |                             # to cause the most basic overrun. | ||||||
|                         for i in range(buf_size): |                             for i in range(buf_size): | ||||||
|                             print(f'sending {i}') |                                 print(f'sending {i}') | ||||||
|                             await stream.send(i) |                                 await stream.send(i) | ||||||
| 
 | 
 | ||||||
|                         else: |                             else: | ||||||
|                             # expect overrun error to be relayed back |                                 # expect overrun error to be relayed back | ||||||
|                             # and this sleep interrupted |                                 # and this sleep interrupted | ||||||
|                             await trio.sleep_forever() |                                 await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
|                 else: |                     else: | ||||||
|                     # callee overruns caller case so we do nothing here |                         # callee overruns caller case so we do nothing here | ||||||
|                     await trio.sleep_forever() |                         await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
|             await portal.cancel_actor() |             await portal.cancel_actor() | ||||||
| 
 | 
 | ||||||
|  | @ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream( | ||||||
|                 loglevel=loglevel, |                 loglevel=loglevel, | ||||||
|                 debug_mode=debug_mode, |                 debug_mode=debug_mode, | ||||||
|             ) |             ) | ||||||
|             seq = list(range(10)) |  | ||||||
|             async with portal.open_context( |  | ||||||
|                 echo_back_sequence, |  | ||||||
|                 seq=seq, |  | ||||||
|                 wait_for_cancel=cancel_ctx, |  | ||||||
|                 be_slow=(slow_side == 'child'), |  | ||||||
|                 allow_overruns_side=allow_overruns_side, |  | ||||||
| 
 | 
 | ||||||
|             ) as (ctx, sent): |             # stream-sequence batch info with send delay to determine | ||||||
|                 assert sent is None |             # approx timeout determining whether test has hung. | ||||||
|  |             total_batches: int = 2 | ||||||
|  |             num_items: int = 10 | ||||||
|  |             seq = list(range(num_items)) | ||||||
|  |             parent_send_delay: float = 0.16 | ||||||
|  |             timeout: float = math.ceil( | ||||||
|  |                 total_batches * num_items * parent_send_delay | ||||||
|  |             ) | ||||||
|  |             with trio.fail_after(timeout): | ||||||
|  |                 async with portal.open_context( | ||||||
|  |                     echo_back_sequence, | ||||||
|  |                     seq=seq, | ||||||
|  |                     wait_for_cancel=cancel_ctx, | ||||||
|  |                     be_slow=(slow_side == 'child'), | ||||||
|  |                     allow_overruns_side=allow_overruns_side, | ||||||
| 
 | 
 | ||||||
|                 async with ctx.open_stream( |                 ) as (ctx, sent): | ||||||
|                     msg_buffer_size=1 if slow_side == 'parent' else None, |                     assert sent is None | ||||||
|                     allow_overruns=(allow_overruns_side in {'parent', 'both'}), |  | ||||||
|                 ) as stream: |  | ||||||
| 
 | 
 | ||||||
|                     total_batches: int = 2 |                     async with ctx.open_stream( | ||||||
|                     for _ in range(total_batches): |                         msg_buffer_size=1 if slow_side == 'parent' else None, | ||||||
|                         for msg in seq: |                         allow_overruns=(allow_overruns_side in {'parent', 'both'}), | ||||||
|                             # print(f'root tx {msg}') |                     ) as stream: | ||||||
|                             await stream.send(msg) |  | ||||||
|                             if slow_side == 'parent': |  | ||||||
|                                 # NOTE: we make the parent slightly |  | ||||||
|                                 # slower, when it is slow, to make sure |  | ||||||
|                                 # that in the overruns everywhere case |  | ||||||
|                                 await trio.sleep(0.16) |  | ||||||
| 
 | 
 | ||||||
|                         batch = [] |                         for _ in range(total_batches): | ||||||
|                         async for msg in stream: |                             for msg in seq: | ||||||
|                             print(f'root rx {msg}') |                                 # print(f'root tx {msg}') | ||||||
|                             batch.append(msg) |                                 await stream.send(msg) | ||||||
|                             if batch == seq: |                                 if slow_side == 'parent': | ||||||
|                                 break |                                     # NOTE: we make the parent slightly | ||||||
|  |                                     # slower, when it is slow, to make sure | ||||||
|  |                                     # that in the overruns everywhere case | ||||||
|  |                                     await trio.sleep(parent_send_delay) | ||||||
|  | 
 | ||||||
|  |                             batch = [] | ||||||
|  |                             async for msg in stream: | ||||||
|  |                                 print(f'root rx {msg}') | ||||||
|  |                                 batch.append(msg) | ||||||
|  |                                 if batch == seq: | ||||||
|  |                                     break | ||||||
|  | 
 | ||||||
|  |                     if cancel_ctx: | ||||||
|  |                         # cancel the remote task | ||||||
|  |                         print('Requesting `ctx.cancel()` in parent!') | ||||||
|  |                         await ctx.cancel() | ||||||
|  | 
 | ||||||
|  |                 res: str|ContextCancelled = await ctx.result() | ||||||
| 
 | 
 | ||||||
|                 if cancel_ctx: |                 if cancel_ctx: | ||||||
|                     # cancel the remote task |                     assert isinstance(res, ContextCancelled) | ||||||
|                     print('Requesting `ctx.cancel()` in parent!') |                     assert tuple(res.canceller) == current_actor().uid | ||||||
|                     await ctx.cancel() |  | ||||||
| 
 | 
 | ||||||
|             res: str|ContextCancelled = await ctx.result() |                 else: | ||||||
| 
 |                     print(f'RX ROOT SIDE RESULT {res}') | ||||||
|             if cancel_ctx: |                     assert res == 'yo' | ||||||
|                 assert isinstance(res, ContextCancelled) |  | ||||||
|                 assert tuple(res.canceller) == current_actor().uid |  | ||||||
| 
 |  | ||||||
|             else: |  | ||||||
|                 print(f'RX ROOT SIDE RESULT {res}') |  | ||||||
|                 assert res == 'yo' |  | ||||||
| 
 | 
 | ||||||
|             # cancel the daemon |             # cancel the daemon | ||||||
|             await portal.cancel_actor() |             await portal.cancel_actor() | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue