import time
from itertools import cycle

import pytest
import trio
import tractor
from tractor.experimental import msgpub

from conftest import tractor_test


def test_type_checks():

    with pytest.raises(TypeError) as err:
        @msgpub
        async def no_get_topics(yo):
            yield

    assert "must define a `get_topics`" in str(err.value)

    with pytest.raises(TypeError) as err:
        @msgpub
        def not_async_gen(yo):
            pass

    assert "must be an async generator function" in str(err.value)


def is_even(i):
    return i % 2 == 0


# placeholder for topics getter
_get_topics = None


@msgpub
async def pubber(get_topics, seed=10):

    # ensure topic subscriptions are as expected
    global _get_topics
    _get_topics = get_topics

    for i in cycle(range(seed)):

        yield {'even' if is_even(i) else 'odd': i}
        await trio.sleep(0.1)


async def subs(
    which,
    pub_actor_name,
    seed=10,
    task_status=trio.TASK_STATUS_IGNORED,
):
    if len(which) == 1:
        if which[0] == 'even':
            pred = is_even
        else:
            def pred(i):
                return not is_even(i)
    else:
        def pred(i):
            return isinstance(i, int)

    # TODO: https://github.com/goodboy/tractor/issues/207
    async with tractor.wait_for_actor(pub_actor_name) as portal:
        assert portal

        async with portal.open_stream_from(
            pubber,
            topics=which,
            seed=seed,
        ) as stream:
            task_status.started(stream)
            times = 10
            count = 0
            await stream.__anext__()
            async for pkt in stream:
                for topic, value in pkt.items():
                    assert pred(value)
                count += 1
                if count >= times:
                    break

            await stream.aclose()

        async with portal.open_stream_from(
            pubber,
            topics=['odd'],
            seed=seed,
        ) as stream:
            await stream.__anext__()
            count = 0
            # async with aclosing(stream) as stream:
            try:
                async for pkt in stream:
                    for topic, value in pkt.items():
                        pass
                        # assert pred(value)
                    count += 1
                    if count >= times:
                        break
            finally:
                await stream.aclose()


@msgpub(tasks=['one', 'two'])
async def multilock_pubber(get_topics):
    yield {'doggy': 10}


@pytest.mark.parametrize(
    'callwith_expecterror',
    [
        (pubber, {}, TypeError),
        # missing a `topics`
        (multilock_pubber, {'ctx': None}, TypeError),
        # missing a `task_name`
        (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError),
        # should work
        (multilock_pubber,
         {'ctx': None, 'topics': ['doggy'], 'task_name': 'one'},
         None),
    ],
)
@tractor_test
async def test_required_args(callwith_expecterror):
    func, kwargs, err = callwith_expecterror

    if err is not None:
        with pytest.raises(err):
            await func(**kwargs)
    else:
        async with tractor.open_nursery() as n:

            portal = await n.start_actor(
                name='pubber',
                enable_modules=[__name__],
            )

            async with tractor.wait_for_actor('pubber'):
                pass

            await trio.sleep(0.5)

            async with portal.open_stream_from(
                multilock_pubber,
                **kwargs
            ) as stream:
                async for val in stream:
                    assert val == {'doggy': 10}

            await portal.cancel_actor()


@pytest.mark.parametrize(
    'pub_actor',
    ['streamer', 'arbiter']
)
def test_multi_actor_subs_arbiter_pub(
    loglevel,
    reg_addr,
    pub_actor,
):
    """Try out the neato @pub decorator system.
    """
    global _get_topics

    async def main():

        async with tractor.open_nursery(
            registry_addrs=[reg_addr],
            enable_modules=[__name__],
        ) as n:

            name = 'root'

            if pub_actor == 'streamer':
                # start the publisher as a daemon
                master_portal = await n.start_actor(
                    'streamer',
                    enable_modules=[__name__],
                )
                name = 'streamer'

            even_portal = await n.run_in_actor(
                subs,
                which=['even'],
                name='evens',
                pub_actor_name=name
            )
            odd_portal = await n.run_in_actor(
                subs,
                which=['odd'],
                name='odds',
                pub_actor_name=name
            )

            async with tractor.wait_for_actor('evens'):
                # block until 2nd actor is initialized
                pass

            if pub_actor == 'arbiter':

                # wait for publisher task to be spawned in a local RPC task
                while _get_topics is None:
                    await trio.sleep(0.1)

                get_topics = _get_topics

                assert 'even' in get_topics()

            async with tractor.wait_for_actor('odds'):
                # block until 2nd actor is initialized
                pass

            if pub_actor == 'arbiter':
                start = time.time()
                while 'odd' not in get_topics():
                    await trio.sleep(0.1)
                    if time.time() - start > 1:
                        pytest.fail("odds subscription never arrived?")

            # TODO: how to make this work when the arbiter gets
            # a portal to itself? Currently this causes a hang
            # when the channel server is torn down due to a lingering
            # loopback channel
            #     with trio.move_on_after(1):
            #         await subs(['even', 'odd'])

            # XXX: this would cause infinite
            # blocking due to actor never terminating loop
            # await even_portal.result()

            await trio.sleep(0.5)
            await even_portal.cancel_actor()
            await trio.sleep(1)

            if pub_actor == 'arbiter':
                assert 'even' not in get_topics()

            await odd_portal.cancel_actor()

            if pub_actor == 'arbiter':
                while get_topics():
                    await trio.sleep(0.1)
                    if time.time() - start > 2:
                        pytest.fail("odds subscription never dropped?")
            else:
                await master_portal.cancel_actor()

    trio.run(main)


def test_single_subactor_pub_multitask_subs(
    loglevel,
    reg_addr,
):
    async def main():

        async with tractor.open_nursery(
            registry_addrs=[reg_addr],
            enable_modules=[__name__],
        ) as n:

            portal = await n.start_actor(
                'streamer',
                enable_modules=[__name__],
            )
            async with tractor.wait_for_actor('streamer'):
                # block until 2nd actor is initialized
                pass

            async with trio.open_nursery() as tn:
                agen = await tn.start(subs, ['even'], 'streamer')

                await trio.sleep(0.1)
                tn.start_soon(subs, ['even'], 'streamer')

                # XXX this will trigger the python bug:
                # https://bugs.python.org/issue32526
                # if using async generators to wrap tractor channels
                await agen.aclose()

                await trio.sleep(0.1)
                tn.start_soon(subs, ['even'], 'streamer')
                await trio.sleep(0.1)
                tn.start_soon(subs, ['even'], 'streamer')

            await portal.cancel_actor()

    trio.run(main)