Compare commits
	
		
			3 Commits 
		
	
	
		
			67f673bf36
			...
			f2ce4a3469
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						f2ce4a3469 | |
| 
							
							
								 | 
						3aa964315a | |
| 
							
							
								 | 
						f3ca8608d5 | 
| 
						 | 
					@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from itertools import count
 | 
					from itertools import count
 | 
				
			||||||
 | 
					import math
 | 
				
			||||||
import platform
 | 
					import platform
 | 
				
			||||||
from pprint import pformat
 | 
					from pprint import pformat
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
| 
						 | 
					@ -872,7 +873,7 @@ def test_one_end_stream_not_opened(
 | 
				
			||||||
                enable_modules=[__name__],
 | 
					                enable_modules=[__name__],
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            with trio.fail_after(1):
 | 
					            with trio.fail_after(0.8):
 | 
				
			||||||
                async with portal.open_context(
 | 
					                async with portal.open_context(
 | 
				
			||||||
                    entrypoint,
 | 
					                    entrypoint,
 | 
				
			||||||
                ) as (ctx, sent):
 | 
					                ) as (ctx, sent):
 | 
				
			||||||
| 
						 | 
					@ -1059,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
 | 
				
			||||||
                loglevel=loglevel,
 | 
					                loglevel=loglevel,
 | 
				
			||||||
                debug_mode=debug_mode,
 | 
					                debug_mode=debug_mode,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            seq = list(range(10))
 | 
					 | 
				
			||||||
            async with portal.open_context(
 | 
					 | 
				
			||||||
                echo_back_sequence,
 | 
					 | 
				
			||||||
                seq=seq,
 | 
					 | 
				
			||||||
                wait_for_cancel=cancel_ctx,
 | 
					 | 
				
			||||||
                be_slow=(slow_side == 'child'),
 | 
					 | 
				
			||||||
                allow_overruns_side=allow_overruns_side,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            ) as (ctx, sent):
 | 
					            # stream-sequence batch info with send delay to determine
 | 
				
			||||||
                assert sent is None
 | 
					            # approx timeout determining whether test has hung.
 | 
				
			||||||
 | 
					            total_batches: int = 2
 | 
				
			||||||
 | 
					            num_items: int = 10
 | 
				
			||||||
 | 
					            seq = list(range(num_items))
 | 
				
			||||||
 | 
					            parent_send_delay: float = 0.16
 | 
				
			||||||
 | 
					            timeout: float = math.ceil(
 | 
				
			||||||
 | 
					                total_batches * num_items * parent_send_delay
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            with trio.fail_after(timeout):
 | 
				
			||||||
 | 
					                async with portal.open_context(
 | 
				
			||||||
 | 
					                    echo_back_sequence,
 | 
				
			||||||
 | 
					                    seq=seq,
 | 
				
			||||||
 | 
					                    wait_for_cancel=cancel_ctx,
 | 
				
			||||||
 | 
					                    be_slow=(slow_side == 'child'),
 | 
				
			||||||
 | 
					                    allow_overruns_side=allow_overruns_side,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                async with ctx.open_stream(
 | 
					                ) as (ctx, sent):
 | 
				
			||||||
                    msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
					                    assert sent is None
 | 
				
			||||||
                    allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
					 | 
				
			||||||
                ) as stream:
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    total_batches: int = 2
 | 
					                    async with ctx.open_stream(
 | 
				
			||||||
                    for _ in range(total_batches):
 | 
					                        msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
				
			||||||
                        for msg in seq:
 | 
					                        allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
				
			||||||
                            # print(f'root tx {msg}')
 | 
					                    ) as stream:
 | 
				
			||||||
                            await stream.send(msg)
 | 
					 | 
				
			||||||
                            if slow_side == 'parent':
 | 
					 | 
				
			||||||
                                # NOTE: we make the parent slightly
 | 
					 | 
				
			||||||
                                # slower, when it is slow, to make sure
 | 
					 | 
				
			||||||
                                # that in the overruns everywhere case
 | 
					 | 
				
			||||||
                                await trio.sleep(0.16)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        batch = []
 | 
					                        for _ in range(total_batches):
 | 
				
			||||||
                        async for msg in stream:
 | 
					                            for msg in seq:
 | 
				
			||||||
                            print(f'root rx {msg}')
 | 
					                                # print(f'root tx {msg}')
 | 
				
			||||||
                            batch.append(msg)
 | 
					                                await stream.send(msg)
 | 
				
			||||||
                            if batch == seq:
 | 
					                                if slow_side == 'parent':
 | 
				
			||||||
                                break
 | 
					                                    # NOTE: we make the parent slightly
 | 
				
			||||||
 | 
					                                    # slower, when it is slow, to make sure
 | 
				
			||||||
 | 
					                                    # that in the overruns everywhere case
 | 
				
			||||||
 | 
					                                    await trio.sleep(parent_send_delay)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            batch = []
 | 
				
			||||||
 | 
					                            async for msg in stream:
 | 
				
			||||||
 | 
					                                print(f'root rx {msg}')
 | 
				
			||||||
 | 
					                                batch.append(msg)
 | 
				
			||||||
 | 
					                                if batch == seq:
 | 
				
			||||||
 | 
					                                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    if cancel_ctx:
 | 
				
			||||||
 | 
					                        # cancel the remote task
 | 
				
			||||||
 | 
					                        print('Requesting `ctx.cancel()` in parent!')
 | 
				
			||||||
 | 
					                        await ctx.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                res: str|ContextCancelled = await ctx.result()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if cancel_ctx:
 | 
					                if cancel_ctx:
 | 
				
			||||||
                    # cancel the remote task
 | 
					                    assert isinstance(res, ContextCancelled)
 | 
				
			||||||
                    print('Requesting `ctx.cancel()` in parent!')
 | 
					                    assert tuple(res.canceller) == current_actor().uid
 | 
				
			||||||
                    await ctx.cancel()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            res: str|ContextCancelled = await ctx.result()
 | 
					                else:
 | 
				
			||||||
 | 
					                    print(f'RX ROOT SIDE RESULT {res}')
 | 
				
			||||||
            if cancel_ctx:
 | 
					                    assert res == 'yo'
 | 
				
			||||||
                assert isinstance(res, ContextCancelled)
 | 
					 | 
				
			||||||
                assert tuple(res.canceller) == current_actor().uid
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                print(f'RX ROOT SIDE RESULT {res}')
 | 
					 | 
				
			||||||
                assert res == 'yo'
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # cancel the daemon
 | 
					            # cancel the daemon
 | 
				
			||||||
            await portal.cancel_actor()
 | 
					            await portal.cancel_actor()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -438,8 +438,8 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
 | 
				
			||||||
    'msgspec_codec',
 | 
					    'msgspec_codec',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: move this to our new `Msg`-spec!
 | 
					    # TODO: move this to our new `Msg`-spec!
 | 
				
			||||||
    # default=_def_msgspec_codec,
 | 
					    default=_def_msgspec_codec,
 | 
				
			||||||
    default=_def_tractor_codec,
 | 
					    # default=_def_tractor_codec,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue