'''
``async with ():`` inlined context-stream cancellation testing.

Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand.

'''
from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from pprint import pformat
from typing import (
    Callable,
)

import pytest
import trio
import tractor
from tractor import (
    Actor,
    Context,
    current_actor,
)
from tractor._exceptions import (
    StreamOverrun,
    ContextCancelled,
)

from conftest import tractor_test

# ``Context`` semantics are as follows,
#  ------------------------------------

# - standard setup/teardown:
#   ``Portal.open_context()`` starts a new
#   remote task context in another actor. The target actor's task must
#   call ``Context.started()`` to unblock this entry on the caller side.
#   the callee task executes until complete and returns a final value
#   which is delivered to the caller side and retreived via
#   ``Context.result()``.

# - cancel termination:
#   context can be cancelled on either side where either end's task can
#   call ``Context.cancel()`` which raises a local ``trio.Cancelled``
#   and sends a task cancel request to the remote task which in turn
#   raises a ``trio.Cancelled`` in that scope, catches it, and re-raises
#   as ``ContextCancelled``. This is then caught by
#   ``Portal.open_context()``'s exit and we get a graceful termination
#   of the linked tasks.

# - error termination:
#   error is caught after all context-cancel-scope tasks are cancelled
#   via regular ``trio`` cancel scope semantics, error is sent to other
#   side and unpacked as a `RemoteActorError`.


# ``Context.open_stream() as stream: MsgStream:`` msg semantics are:
#  -----------------------------------------------------------------

# - either side can ``.send()`` which emits a 'yield' msgs and delivers
#   a value to the a ``MsgStream.receive()`` call.

# - stream closure: one end relays a 'stop' message which terminates an
#   ongoing ``MsgStream`` iteration.

# - cancel/error termination: as per the context semantics above but
#   with implicit stream closure on the cancelling end.


_state: bool = False


@tractor.context
async def too_many_starteds(
    ctx: Context,
) -> None:
    '''
    Call ``Context.started()`` more then once (an error).

    '''
    await ctx.started()
    try:
        await ctx.started()
    except RuntimeError:
        raise


@tractor.context
async def not_started_but_stream_opened(
    ctx: Context,
) -> None:
    '''
    Enter ``Context.open_stream()`` without calling ``.started()``.

    '''
    try:
        async with ctx.open_stream():
            assert 0
    except RuntimeError:
        raise


@pytest.mark.parametrize(
    'target',
    [
        too_many_starteds,
        not_started_but_stream_opened,
    ],
    ids='misuse_type={}'.format,
)
def test_started_misuse(
    target: Callable,
    debug_mode: bool,
):
    async def main():
        async with tractor.open_nursery(
            debug_mode=debug_mode,
        ) as an:
            portal = await an.start_actor(
                target.__name__,
                enable_modules=[__name__],
            )

            async with portal.open_context(target) as (ctx, sent):
                await trio.sleep(1)

    with pytest.raises(tractor.RemoteActorError):
        trio.run(main)


@tractor.context
async def simple_setup_teardown(

    ctx: Context,
    data: int,
    block_forever: bool = False,

) -> None:

    # startup phase
    global _state
    _state = True

    # signal to parent that we're up
    await ctx.started(data + 1)

    try:
        if block_forever:
            # block until cancelled
            await trio.sleep_forever()
        else:
            return 'yo'
    finally:
        _state = False


async def assert_state(value: bool):
    global _state
    assert _state == value


@pytest.mark.parametrize(
    'error_parent',
    [False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
    'callee_blocks_forever',
    [False, True],
    ids=lambda item: f'callee_blocks_forever={item}'
)
@pytest.mark.parametrize(
    'pointlessly_open_stream',
    [False, True],
    ids=lambda item: f'open_stream={item}'
)
def test_simple_context(
    error_parent,
    callee_blocks_forever,
    pointlessly_open_stream,
    debug_mode: bool,
):

    timeout = 1.5 if not platform.system() == 'Windows' else 4

    async def main():

        with trio.fail_after(timeout):
            async with tractor.open_nursery(
                debug_mode=debug_mode,
            ) as an:
                portal = await an.start_actor(
                    'simple_context',
                    enable_modules=[__name__],
                )

                try:
                    async with portal.open_context(
                        simple_setup_teardown,
                        data=10,
                        block_forever=callee_blocks_forever,
                    ) as (ctx, sent):

                        assert sent == 11

                        if callee_blocks_forever:
                            await portal.run(assert_state, value=True)
                        else:
                            assert await ctx.result() == 'yo'

                        if pointlessly_open_stream:
                            async with ctx.open_stream():
                                if error_parent:
                                    raise error_parent

                                if callee_blocks_forever:
                                    await ctx.cancel()
                                else:
                                    # in this case the stream will send a
                                    # 'stop' msg to the far end which needs
                                    # to be ignored
                                    pass

                        else:
                            if error_parent:
                                raise error_parent

                            # cancel AFTER we open a stream
                            # to avoid a cancel raised inside
                            # `.open_stream()`
                            await ctx.cancel()
                finally:

                    # after cancellation
                    if not error_parent:
                        await portal.run(assert_state, value=False)

                    # shut down daemon
                    await portal.cancel_actor()

    if error_parent:
        try:
            trio.run(main)
        except error_parent:
            pass
        except trio.MultiError as me:
            # XXX: on windows it seems we may have to expect the group error
            from tractor._exceptions import is_multi_cancelled
            assert is_multi_cancelled(me)
    else:
        trio.run(main)


@acm
async def expect_ctxc(yay: bool) -> None:
    if yay:
        try:
            yield
        except ContextCancelled:
            return
    else:
        yield


@pytest.mark.parametrize(
    'callee_returns_early',
    [True, False],
    ids=lambda item: f'callee_returns_early={item}'
)
@pytest.mark.parametrize(
    'cancel_method',
    ['ctx', 'portal'],
    ids=lambda item: f'cancel_method={item}'
)
@pytest.mark.parametrize(
    'chk_ctx_result_before_exit',
    [True, False],
    ids=lambda item: f'chk_ctx_result_before_exit={item}'
)
def test_caller_cancels(
    cancel_method: str,
    chk_ctx_result_before_exit: bool,
    callee_returns_early: bool,
    debug_mode: bool,
):
    '''
    Verify that when the opening side of a context (aka the caller)
    cancels that context, the ctx does not raise a cancelled when
    either calling `.result()` or on context exit.

    '''
    async def check_canceller(
        ctx: Context,
    ) -> None:
        actor: Actor = current_actor()
        uid: tuple = actor.uid

        if (
            cancel_method == 'portal'
            and not callee_returns_early
        ):
            try:
                res = await ctx.result()
                assert 0, 'Portal cancel should raise!'

            except ContextCancelled as ctxc:
                assert ctx.chan._cancel_called
                assert ctxc.canceller == uid
                assert ctxc is ctx.maybe_error

        # NOTE: should not ever raise even in the `ctx`
        # case since self-cancellation should swallow the ctxc
        # silently!
        else:
            res = await ctx.result()

        # we actually get a result
        if callee_returns_early:
            assert res == 'yo'
            assert ctx.outcome is res
            assert ctx.maybe_error is None

        else:
            err: Exception = ctx.outcome
            assert isinstance(err, ContextCancelled)
            assert (
                tuple(err.canceller)
                ==
                uid
            )
            assert (
                err
                is ctx.maybe_error
                is ctx._remote_error
            )
            if le := ctx._local_error:
                assert err is le

            # else:
                # TODO: what should this be then?
                # not defined until block closes right?
                #
                # await tractor.pause()
                # assert ctx._local_error is None


    async def main():

        async with tractor.open_nursery(
            debug_mode=debug_mode,
        ) as an:
            portal = await an.start_actor(
                'simple_context',
                enable_modules=[__name__],
            )
            timeout = 0.5 if not callee_returns_early else 2
            with trio.fail_after(timeout):
                async with (

                    expect_ctxc(yay=cancel_method == 'portal'),

                    portal.open_context(
                        simple_setup_teardown,
                        data=10,
                        block_forever=not callee_returns_early,
                    ) as (ctx, sent),
                ):

                    if callee_returns_early:
                        # ensure we block long enough before sending
                        # a cancel such that the callee has already
                        # returned it's result.
                        await trio.sleep(0.5)

                    if cancel_method == 'ctx':
                        await ctx.cancel()
                    else:
                        await portal.cancel_actor()

                    if chk_ctx_result_before_exit:
                        await check_canceller(ctx)

            if not chk_ctx_result_before_exit:
                await check_canceller(ctx)

            if cancel_method != 'portal':
                await portal.cancel_actor()

            # since the `.cancel_actor()` call just above
            # will cause the `.open_context().__aexit__()` raise
            # a ctxc which should in turn cause `ctx._scope` to
            # catch any cancellation?
            if (
                not callee_returns_early
                and cancel_method == 'portal'
            ):
                assert ctx._scope.cancelled_caught

    trio.run(main)


# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
#   is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
#   sees stream terminated in `RemoteActorError`

# TODO: future possible features
# - restart request: far end raises `ContextRestart`


@tractor.context
async def close_ctx_immediately(

    ctx: Context,

) -> None:

    await ctx.started()
    global _state

    async with ctx.open_stream():
        pass


@tractor_test
async def test_callee_closes_ctx_after_stream_open(
    debug_mode: bool,
):
    '''
    callee context closes without using stream.

    This should result in a msg sequence
    |_<root>_
             |_<fast_stream_closer>

             <= {'started': <Any>, 'cid': <str>}
             <= {'stop': True, 'cid': <str>}
             <= {'result': Any, ..}

     (ignored by child)
    => {'stop': True, 'cid': <str>}

    '''
    async with tractor.open_nursery(
        debug_mode=debug_mode,
    ) as an:
        portal = await an.start_actor(
            'fast_stream_closer',
            enable_modules=[__name__],
        )

        with trio.fail_after(0.5):
            async with portal.open_context(
                close_ctx_immediately,

                # flag to avoid waiting the final result
                # cancel_on_exit=True,

            ) as (ctx, sent):
                assert sent is None

                with trio.fail_after(0.4):
                    async with ctx.open_stream() as stream:

                        # should fall through since ``StopAsyncIteration``
                        # should be raised through translation of
                        # a ``trio.EndOfChannel`` by
                        # ``trio.abc.ReceiveChannel.__anext__()``
                        async for _ in stream:
                            # trigger failure if we DO NOT
                            # get an EOC!
                            assert 0
                        else:

                            # verify stream is now closed
                            try:
                                with trio.fail_after(0.3):
                                    await stream.receive()
                            except trio.EndOfChannel:
                                pass

                # TODO: should be just raise the closed resource err
                # directly here to enforce not allowing a re-open
                # of a stream to the context (at least until a time of
                # if/when we decide that's a good idea?)
                try:
                    with trio.fail_after(0.5):
                        async with ctx.open_stream() as stream:
                            pass
                except trio.ClosedResourceError:
                    pass

        await portal.cancel_actor()


@tractor.context
async def expect_cancelled(
    ctx: Context,

) -> None:
    global _state
    _state = True

    await ctx.started()

    try:
        async with ctx.open_stream() as stream:
            async for msg in stream:
                await stream.send(msg)  # echo server

    except trio.Cancelled:
        # expected case
        _state = False
        raise

    else:
        assert 0, "callee wasn't cancelled !?"


@pytest.mark.parametrize(
    'use_ctx_cancel_method',
    [False, True],
)
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
    use_ctx_cancel_method: bool,
    debug_mode: bool,
):
    '''
    caller context closes without using/opening stream

    '''
    async with tractor.open_nursery(
        debug_mode=debug_mode,
    ) as an:

        root: Actor = current_actor()
        portal = await an.start_actor(
            'ctx_cancelled',
            enable_modules=[__name__],
        )

        async with portal.open_context(
            expect_cancelled,
        ) as (ctx, sent):
            assert sent is None

            await portal.run(assert_state, value=True)

            # call `ctx.cancel()` explicitly
            if use_ctx_cancel_method:
                await ctx.cancel()

                # NOTE: means the local side `ctx._scope` will
                # have been cancelled by an ctxc ack and thus
                # `._scope.cancelled_caught` should be set.
                try:
                    async with ctx.open_stream() as stream:
                        async for msg in stream:
                            pass

                except tractor.ContextCancelled as ctxc:
                    # XXX: the cause is US since we call
                    # `Context.cancel()` just above!
                    assert (
                        ctxc.canceller
                        ==
                        current_actor().uid
                        ==
                        root.uid
                    )

                    # XXX: must be propagated to __aexit__
                    # and should be silently absorbed there
                    # since we called `.cancel()` just above ;)
                    raise

                else:
                    assert 0, "Should have context cancelled?"

                # channel should still be up
                assert portal.channel.connected()

                # ctx is closed here
                await portal.run(
                    assert_state,
                    value=False,
                )

            else:
                try:
                    with trio.fail_after(0.2):
                        await ctx.result()
                        assert 0, "Callee should have blocked!?"

                except trio.TooSlowError:
                    # NO-OP -> since already called above
                    await ctx.cancel()

        # NOTE: local scope should have absorbed the cancellation since
        # in this case we call `ctx.cancel()` and the local
        # `._scope` gets `.cancel_called` on the ctxc ack.
        if use_ctx_cancel_method:
            assert ctx._scope.cancelled_caught

        # rxed ctxc response from far end
        assert ctx.cancel_acked
        assert (
            ctx._remote_error
            is ctx._local_error
            is ctx.maybe_error
            is ctx.outcome
        )

        try:
            async with ctx.open_stream() as stream:
                async for msg in stream:
                    pass
        except tractor.ContextCancelled:
            pass
        else:
            assert 0, "Should have received closed resource error?"

        # ctx is closed here
        await portal.run(assert_state, value=False)

        # channel should not have been destroyed yet, only the
        # inter-actor-task context
        assert portal.channel.connected()

        # teardown the actor
        await portal.cancel_actor()


@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task(
    debug_mode: bool,
):
    async with tractor.open_nursery(
        debug_mode=debug_mode,
    ) as an:
        portal = await an.start_actor(
            'ctx_cancelled',
            enable_modules=[__name__],
        )

        async with portal.open_context(
            expect_cancelled,
        ) as (ctx, sent):

            await portal.run(assert_state, value=True)
            assert sent is None

            async with ctx.open_stream() as stream:

                async def send_msg_then_cancel():
                    await stream.send('yo')
                    await portal.run(assert_state, value=True)
                    await ctx.cancel()
                    await portal.run(assert_state, value=False)

                async with trio.open_nursery() as n:
                    n.start_soon(send_msg_then_cancel)

                    try:
                        async for msg in stream:
                            assert msg == 'yo'

                    except tractor.ContextCancelled:
                        raise  # XXX: must be propagated to __aexit__

                # channel should still be up
                assert portal.channel.connected()

                # ctx is closed here
                await portal.run(assert_state, value=False)

        # channel should not have been destroyed yet, only the
        # inter-actor-task context
        assert portal.channel.connected()

        # teardown the actor
        await portal.cancel_actor()


@tractor.context
async def cancel_self(

    ctx: Context,

) -> None:
    global _state
    _state = True

    # since we call this the below `.open_stream()` should always
    # error!
    await ctx.cancel()

    # should inline raise immediately
    try:
        async with ctx.open_stream():
            pass
    # except tractor.ContextCancelled:
    except RuntimeError:
        # suppress for now so we can do checkpoint tests below
        print('Got expected runtime error for stream-after-cancel')

    else:
        raise RuntimeError('Context didnt cancel itself?!')

    # check that``trio.Cancelled`` is now raised on any further
    # checkpoints since the self cancel above will have cancelled
    # the `Context._scope.cancel_scope: trio.CancelScope`
    try:
        with trio.fail_after(0.1):
            await trio.sleep_forever()
    except trio.Cancelled:
        raise

    except trio.TooSlowError:
        # should never get here
        assert 0

    raise RuntimeError('Context didnt cancel itself?!')


@tractor_test
async def test_callee_cancels_before_started(
    debug_mode: bool,
):
    '''
    Callee calls `Context.cancel()` while streaming and caller
    sees stream terminated in `ContextCancelled`.

    '''
    async with tractor.open_nursery(
        debug_mode=debug_mode,
    ) as an:
        portal = await an.start_actor(
            'cancels_self',
            enable_modules=[__name__],
        )
        try:

            async with portal.open_context(
                cancel_self,
            ) as (ctx, sent):
                async with ctx.open_stream():
                    await trio.sleep_forever()

        # raises a special cancel signal
        except tractor.ContextCancelled as ce:
            ce.type == trio.Cancelled

            # the traceback should be informative
            assert 'itself' in ce.msgdata['tb_str']

        # teardown the actor
        await portal.cancel_actor()


@tractor.context
async def never_open_stream(

    ctx:  Context,

) -> None:
    '''
    Context which never opens a stream and blocks.

    '''
    await ctx.started()
    await trio.sleep_forever()


@tractor.context
async def keep_sending_from_callee(

    ctx:  Context,
    msg_buffer_size: int|None = None,

) -> None:
    '''
    Send endlessly on the calleee stream.

    '''
    await ctx.started()
    async with ctx.open_stream(
        msg_buffer_size=msg_buffer_size,
    ) as stream:
        for msg in count():
            print(f'callee sending {msg}')
            await stream.send(msg)
            await trio.sleep(0.01)


@pytest.mark.parametrize(
    'overrun_by',
    [
        ('caller', 1, never_open_stream),
        ('callee', 0, keep_sending_from_callee),
    ],
    ids='overrun_condition={}'.format,
)
def test_one_end_stream_not_opened(
    overrun_by: tuple[str, int, Callable],
    debug_mode: bool,
):
    '''
    This should exemplify the bug from:
    https://github.com/goodboy/tractor/issues/265

    '''
    overrunner, buf_size_increase, entrypoint = overrun_by
    from tractor._runtime import Actor
    buf_size = buf_size_increase + Actor.msg_buffer_size

    async def main():
        async with tractor.open_nursery(
            debug_mode=debug_mode,
        ) as an:
            portal = await an.start_actor(
                entrypoint.__name__,
                enable_modules=[__name__],
            )

            async with portal.open_context(
                entrypoint,
            ) as (ctx, sent):
                assert sent is None

                if 'caller' in overrunner:

                    async with ctx.open_stream() as stream:

                        # itersend +1 msg more then the buffer size
                        # to cause the most basic overrun.
                        for i in range(buf_size):
                            print(f'sending {i}')
                            await stream.send(i)

                        else:
                            # expect overrun error to be relayed back
                            # and this sleep interrupted
                            await trio.sleep_forever()

                else:
                    # callee overruns caller case so we do nothing here
                    await trio.sleep_forever()

            await portal.cancel_actor()

    # 2 overrun cases and the no overrun case (which pushes right up to
    # the msg limit)
    if (
        overrunner == 'caller'
    ):
        with pytest.raises(tractor.RemoteActorError) as excinfo:
            trio.run(main)

        assert excinfo.value.type == StreamOverrun

    elif overrunner == 'callee':
        with pytest.raises(tractor.RemoteActorError) as excinfo:
            trio.run(main)

        # TODO: embedded remote errors so that we can verify the source
        # error? the callee delivers an error which is an overrun
        # wrapped in a remote actor error.
        assert excinfo.value.type == tractor.RemoteActorError

    else:
        trio.run(main)


@tractor.context
async def echo_back_sequence(

    ctx:  Context,
    seq: list[int],
    wait_for_cancel: bool,
    allow_overruns_side: str,
    be_slow: bool = False,
    msg_buffer_size: int = 1,

) -> None:
    '''
    Send endlessly on the calleee stream using a small buffer size
    setting on the contex to simulate backlogging that would normally
    cause overruns.

    '''
    # NOTE: ensure that if the caller is expecting to cancel this task
    # that we stay echoing much longer then they are so we don't
    # return early instead of receive the cancel msg.
    total_batches: int = (
        1000 if wait_for_cancel
        else 6
    )

    await ctx.started()
    # await tractor.breakpoint()
    async with ctx.open_stream(
        msg_buffer_size=msg_buffer_size,

        # literally the point of this test XD
        allow_overruns=(allow_overruns_side in {'child', 'both'}),
    ) as stream:

        # ensure mem chan settings are correct
        assert (
            ctx._send_chan._state.max_buffer_size
            ==
            msg_buffer_size
        )

        seq = list(seq)  # bleh, msgpack sometimes ain't decoded right
        for i in range(total_batches):
            print(f'starting new stream batch {i} iter in child')
            batch = []

            # EoC case, delay a little instead of hot
            # iter-stopping (since apparently py3.11+ can do that
            # faster then a ctxc can be sent) on the async for
            # loop when child was requested to ctxc.
            if (
                stream.closed
                or
                ctx.cancel_called
            ):
                print('child stream already closed!?!')
                await trio.sleep(0.05)
                continue

            async for msg in stream:
                batch.append(msg)
                if batch == seq:
                    break

                if be_slow:
                    await trio.sleep(0.05)

                print('callee waiting on next')

            print(f'callee echoing back latest batch\n{batch}')
            for msg in batch:
                print(f'callee sending msg\n{msg}')
                await stream.send(msg)

    try:
        return 'yo'
    finally:
        print(
            'exiting callee with context:\n'
            f'{pformat(ctx)}\n'
        )


@pytest.mark.parametrize(
    # aka the side that will / should raise
    # and overrun under normal conditions.
    'allow_overruns_side',
    ['parent', 'child', 'none', 'both'],
    ids=lambda item: f'allow_overruns_side={item}'
)
@pytest.mark.parametrize(
    # aka the side that will / should raise
    # and overrun under normal conditions.
    'slow_side',
    ['parent', 'child'],
    ids=lambda item: f'slow_side={item}'
)
@pytest.mark.parametrize(
    'cancel_ctx',
    [True, False],
    ids=lambda item: f'cancel_ctx={item}'
)
def test_maybe_allow_overruns_stream(
    cancel_ctx: bool,
    slow_side: str,
    allow_overruns_side: str,
    loglevel: str,
    debug_mode: bool,
):
    '''
    Demonstrate small overruns of each task back and forth
    on a stream not raising any errors by default by setting
    the ``allow_overruns=True``.

    The original idea here was to show that if you set the feeder mem
    chan to a size smaller then the # of msgs sent you could could not
    get a `StreamOverrun` crash plus maybe get all the msgs that were
    sent. The problem with the "real backpressure" case is that due to
    the current arch it can result in the msg loop being blocked and thus
    blocking cancellation - which is like super bad. So instead this test
    had to be adjusted to more or less just "not send overrun errors" so
    as to handle the case where the sender just moreso cares about not getting
    errored out when it send to fast..

    '''
    async def main():
        async with tractor.open_nursery(
            debug_mode=debug_mode,
        ) as an:
            portal = await an.start_actor(
                'callee_sends_forever',
                enable_modules=[__name__],
                loglevel=loglevel,
                debug_mode=debug_mode,
            )
            seq = list(range(10))
            async with portal.open_context(
                echo_back_sequence,
                seq=seq,
                wait_for_cancel=cancel_ctx,
                be_slow=(slow_side == 'child'),
                allow_overruns_side=allow_overruns_side,

            ) as (ctx, sent):
                assert sent is None

                async with ctx.open_stream(
                    msg_buffer_size=1 if slow_side == 'parent' else None,
                    allow_overruns=(allow_overruns_side in {'parent', 'both'}),
                ) as stream:

                    total_batches: int = 2
                    for _ in range(total_batches):
                        for msg in seq:
                            # print(f'root tx {msg}')
                            await stream.send(msg)
                            if slow_side == 'parent':
                                # NOTE: we make the parent slightly
                                # slower, when it is slow, to make sure
                                # that in the overruns everywhere case
                                await trio.sleep(0.16)

                        batch = []
                        async for msg in stream:
                            print(f'root rx {msg}')
                            batch.append(msg)
                            if batch == seq:
                                break

                if cancel_ctx:
                    # cancel the remote task
                    print('Requesting `ctx.cancel()` in parent!')
                    await ctx.cancel()

            res: str|ContextCancelled = await ctx.result()

            if cancel_ctx:
                assert isinstance(res, ContextCancelled)
                assert tuple(res.canceller) == current_actor().uid

            else:
                print(f'RX ROOT SIDE RESULT {res}')
                assert res == 'yo'

            # cancel the daemon
            await portal.cancel_actor()

    if (
        allow_overruns_side == 'both'
        or slow_side == allow_overruns_side
    ):
        trio.run(main)

    elif (
        slow_side != allow_overruns_side
    ):

        with pytest.raises(tractor.RemoteActorError) as excinfo:
            trio.run(main)

        err = excinfo.value

        if (
            allow_overruns_side == 'none'
        ):
            # depends on timing is is racy which side will
            # overrun first :sadkitty:

            # NOTE: i tried to isolate to a deterministic case here
            # based on timeing, but i was kinda wasted, and i don't
            # think it's sane to catch them..
            assert err.type in (
                tractor.RemoteActorError,
                StreamOverrun,
            )

        elif (
            slow_side == 'child'
        ):
            assert err.type == StreamOverrun

        elif slow_side == 'parent':
            assert err.type == tractor.RemoteActorError
            assert 'StreamOverrun' in err.msgdata['tb_str']

    else:
        # if this hits the logic blocks from above are not
        # exhaustive..
        pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')