Seriously cover all overrun cases
This actually caught further runtime bugs so it's gud i tried.. Add overrun-ignore enabled / disabled cases and error catching for all of them. More or less this should cover every possible outcome when it comes to setting `allow_overruns: bool` i hope XDctx_cancel_semantics_and_overruns_XPS_GH_BACKUP
							parent
							
								
									63adf73b4b
								
							
						
					
					
						commit
						f9911c22a4
					
				| 
						 | 
				
			
			@ -722,7 +722,9 @@ async def echo_back_sequence(
 | 
			
		|||
    ctx:  tractor.Context,
 | 
			
		||||
    seq: list[int],
 | 
			
		||||
    wait_for_cancel: bool,
 | 
			
		||||
    msg_buffer_size: int | None = None,
 | 
			
		||||
    allow_overruns_side: str,
 | 
			
		||||
    be_slow: bool = False,
 | 
			
		||||
    msg_buffer_size: int = 1,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -737,12 +739,22 @@ async def echo_back_sequence(
 | 
			
		|||
    total_batches: int = 1000 if wait_for_cancel else 6
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
    # await tractor.breakpoint()
 | 
			
		||||
    async with ctx.open_stream(
 | 
			
		||||
        msg_buffer_size=msg_buffer_size,
 | 
			
		||||
        allow_overruns=True,
 | 
			
		||||
 | 
			
		||||
        # literally the point of this test XD
 | 
			
		||||
        allow_overruns=(allow_overruns_side in {'child', 'both'}),
 | 
			
		||||
    ) as stream:
 | 
			
		||||
 | 
			
		||||
        seq = list(seq)  # bleh, `msgpack`...
 | 
			
		||||
        # ensure mem chan settings are correct
 | 
			
		||||
        assert (
 | 
			
		||||
            ctx._send_chan._state.max_buffer_size
 | 
			
		||||
            ==
 | 
			
		||||
            msg_buffer_size
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        seq = list(seq)  # bleh, msgpack sometimes ain't decoded right
 | 
			
		||||
        for _ in range(total_batches):
 | 
			
		||||
            batch = []
 | 
			
		||||
            async for msg in stream:
 | 
			
		||||
| 
						 | 
				
			
			@ -750,6 +762,9 @@ async def echo_back_sequence(
 | 
			
		|||
                if batch == seq:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
                if be_slow:
 | 
			
		||||
                    await trio.sleep(0.05)
 | 
			
		||||
 | 
			
		||||
                print('callee waiting on next')
 | 
			
		||||
 | 
			
		||||
            for msg in batch:
 | 
			
		||||
| 
						 | 
				
			
			@ -763,13 +778,29 @@ async def echo_back_sequence(
 | 
			
		|||
    return 'yo'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    # aka the side that will / should raise
 | 
			
		||||
    # and overrun under normal conditions.
 | 
			
		||||
    'allow_overruns_side',
 | 
			
		||||
    ['parent', 'child', 'none', 'both'],
 | 
			
		||||
    ids=lambda item: f'allow_overruns_side={item}'
 | 
			
		||||
)
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    # aka the side that will / should raise
 | 
			
		||||
    # and overrun under normal conditions.
 | 
			
		||||
    'slow_side',
 | 
			
		||||
    ['parent', 'child'],
 | 
			
		||||
    ids=lambda item: f'slow_side={item}'
 | 
			
		||||
)
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'cancel_ctx',
 | 
			
		||||
    [True, False],
 | 
			
		||||
    ids=lambda item: f'cancel_ctx={item}'
 | 
			
		||||
)
 | 
			
		||||
def test_allow_overruns_stream(
 | 
			
		||||
def test_maybe_allow_overruns_stream(
 | 
			
		||||
    cancel_ctx: bool,
 | 
			
		||||
    slow_side: str,
 | 
			
		||||
    allow_overruns_side: str,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -794,26 +825,35 @@ def test_allow_overruns_stream(
 | 
			
		|||
                'callee_sends_forever',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
                loglevel=loglevel,
 | 
			
		||||
 | 
			
		||||
                # debug_mode=True,
 | 
			
		||||
            )
 | 
			
		||||
            seq = list(range(3))
 | 
			
		||||
            seq = list(range(10))
 | 
			
		||||
            async with portal.open_context(
 | 
			
		||||
                echo_back_sequence,
 | 
			
		||||
                seq=seq,
 | 
			
		||||
                wait_for_cancel=cancel_ctx,
 | 
			
		||||
                be_slow=(slow_side == 'child'),
 | 
			
		||||
                allow_overruns_side=allow_overruns_side,
 | 
			
		||||
            ) as (ctx, sent):
 | 
			
		||||
 | 
			
		||||
                assert sent is None
 | 
			
		||||
 | 
			
		||||
                async with ctx.open_stream(
 | 
			
		||||
                    msg_buffer_size=1,
 | 
			
		||||
                    allow_overruns=True,
 | 
			
		||||
                    msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
			
		||||
                    allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
			
		||||
                ) as stream:
 | 
			
		||||
                    count = 0
 | 
			
		||||
                    while count < 3:
 | 
			
		||||
 | 
			
		||||
                    total_batches: int = 2
 | 
			
		||||
                    for _ in range(total_batches):
 | 
			
		||||
                        for msg in seq:
 | 
			
		||||
                            print(f'root tx {msg}')
 | 
			
		||||
                            # print(f'root tx {msg}')
 | 
			
		||||
                            await stream.send(msg)
 | 
			
		||||
                            await trio.sleep(0.1)
 | 
			
		||||
                            if slow_side == 'parent':
 | 
			
		||||
                                # NOTE: we make the parent slightly
 | 
			
		||||
                                # slower, when it is slow, to make sure
 | 
			
		||||
                                # that in the overruns everywhere case
 | 
			
		||||
                                await trio.sleep(0.16)
 | 
			
		||||
 | 
			
		||||
                        batch = []
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
| 
						 | 
				
			
			@ -822,8 +862,6 @@ def test_allow_overruns_stream(
 | 
			
		|||
                            if batch == seq:
 | 
			
		||||
                                break
 | 
			
		||||
 | 
			
		||||
                        count += 1
 | 
			
		||||
 | 
			
		||||
                if cancel_ctx:
 | 
			
		||||
                    # cancel the remote task
 | 
			
		||||
                    print('sending root side cancel')
 | 
			
		||||
| 
						 | 
				
			
			@ -842,7 +880,48 @@ def test_allow_overruns_stream(
 | 
			
		|||
            # cancel the daemon
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
    if (
 | 
			
		||||
        allow_overruns_side == 'both'
 | 
			
		||||
        or slow_side == allow_overruns_side
 | 
			
		||||
    ):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
    elif (
 | 
			
		||||
        slow_side != allow_overruns_side
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        with pytest.raises(tractor.RemoteActorError) as excinfo:
 | 
			
		||||
            trio.run(main)
 | 
			
		||||
 | 
			
		||||
        err = excinfo.value
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            allow_overruns_side == 'none'
 | 
			
		||||
        ):
 | 
			
		||||
            # depends on timing is is racy which side will
 | 
			
		||||
            # overrun first :sadkitty:
 | 
			
		||||
 | 
			
		||||
            # NOTE: i tried to isolate to a deterministic case here
 | 
			
		||||
            # based on timeing, but i was kinda wasted, and i don't
 | 
			
		||||
            # think it's sane to catch them..
 | 
			
		||||
            assert err.type in (
 | 
			
		||||
                tractor.RemoteActorError,
 | 
			
		||||
                StreamOverrun,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        elif (
 | 
			
		||||
            slow_side == 'child'
 | 
			
		||||
        ):
 | 
			
		||||
            assert err.type == StreamOverrun
 | 
			
		||||
 | 
			
		||||
        elif slow_side == 'parent':
 | 
			
		||||
            assert err.type == tractor.RemoteActorError
 | 
			
		||||
            assert 'StreamOverrun' in err.msgdata['tb_str']
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        # if this hits the logic blocks from above are not
 | 
			
		||||
        # exhaustive..
 | 
			
		||||
        pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -426,6 +426,10 @@ class Context:
 | 
			
		|||
        if remote_uid:
 | 
			
		||||
            return tuple(remote_uid)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancelled_caught(self) -> bool:
 | 
			
		||||
        return self._scope.cancelled_caught
 | 
			
		||||
 | 
			
		||||
    # init and streaming state
 | 
			
		||||
    _started_called: bool = False
 | 
			
		||||
    _started_received: bool = False
 | 
			
		||||
| 
						 | 
				
			
			@ -743,7 +747,7 @@ class Context:
 | 
			
		|||
        ):
 | 
			
		||||
            return err
 | 
			
		||||
 | 
			
		||||
        raise err from None
 | 
			
		||||
        raise err  # from None
 | 
			
		||||
 | 
			
		||||
    async def result(self) -> Any | Exception:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue