import time

import trio
import pytest
import tractor
from tractor.ipc import (
    open_eventfd,
    RingBuffSender,
    RingBuffReceiver
)
from tractor._testing.samples import generate_sample_messages


@tractor.context
async def child_read_shm(
    ctx: tractor.Context,
    msg_amount: int,
    shm_key: str,
    write_eventfd: int,
    wrap_eventfd: int,
    buf_size: int,
    total_bytes: int,
    flags: int = 0,
) -> None:
    recvd_bytes = 0
    await ctx.started()
    start_ts = time.time()
    async with RingBuffReceiver(
        shm_key,
        write_eventfd,
        wrap_eventfd,
        buf_size=buf_size,
        flags=flags
    ) as receiver:
        while recvd_bytes < total_bytes:
            msg = await receiver.receive_some()
            recvd_bytes += len(msg)

        # make sure we dont hold any memoryviews
        # before the ctx manager aclose()
        msg = None

    end_ts = time.time()
    elapsed = end_ts - start_ts
    elapsed_ms = int(elapsed * 1000)

    print(f'\n\telapsed ms: {elapsed_ms}')
    print(f'\tmsg/sec: {int(msg_amount / elapsed):,}')
    print(f'\tbytes/sec: {int(recvd_bytes / elapsed):,}')


@tractor.context
async def child_write_shm(
    ctx: tractor.Context,
    msg_amount: int,
    rand_min: int,
    rand_max: int,
    shm_key: str,
    write_eventfd: int,
    wrap_eventfd: int,
    buf_size: int,
) -> None:
    msgs, total_bytes = generate_sample_messages(
        msg_amount,
        rand_min=rand_min,
        rand_max=rand_max,
    )
    await ctx.started(total_bytes)
    async with RingBuffSender(
        shm_key,
        write_eventfd,
        wrap_eventfd,
        buf_size=buf_size
    ) as sender:
        for msg in msgs:
            await sender.send_all(msg)


@pytest.mark.parametrize(
    'msg_amount,rand_min,rand_max,buf_size',
    [
        # simple case, fixed payloads, large buffer
        (100_000, 0, 0, 10 * 1024),

        # guaranteed wrap around on every write
        (100, 10 * 1024, 20 * 1024, 10 * 1024),

        # large payload size, but large buffer
        (10_000, 256 * 1024, 512 * 1024, 10 * 1024 * 1024)
    ],
    ids=[
        'fixed_payloads_large_buffer',
        'wrap_around_every_write',
        'large_payloads_large_buffer',
    ]
)
def test_ringbuf(
    msg_amount: int,
    rand_min: int,
    rand_max: int,
    buf_size: int
):
    write_eventfd = open_eventfd()
    wrap_eventfd = open_eventfd()

    proc_kwargs = {
        'pass_fds': (write_eventfd, wrap_eventfd)
    }

    shm_key = 'test_ring_buff'

    common_kwargs = {
        'msg_amount': msg_amount,
        'shm_key': shm_key,
        'write_eventfd': write_eventfd,
        'wrap_eventfd': wrap_eventfd,
        'buf_size': buf_size
    }

    async def main():
        async with tractor.open_nursery() as an:
            send_p = await an.start_actor(
                'ring_sender',
                enable_modules=[__name__],
                proc_kwargs=proc_kwargs
            )
            recv_p = await an.start_actor(
                'ring_receiver',
                enable_modules=[__name__],
                proc_kwargs=proc_kwargs
            )
            async with (
                send_p.open_context(
                    child_write_shm,
                    rand_min=rand_min,
                    rand_max=rand_max,
                    **common_kwargs
                ) as (sctx, total_bytes),
                recv_p.open_context(
                    child_read_shm,
                    **common_kwargs,
                    total_bytes=total_bytes,
                ) as (sctx, _sent),
            ):
                await recv_p.result()

            await send_p.cancel_actor()
            await recv_p.cancel_actor()


    trio.run(main)


@tractor.context
async def child_blocked_receiver(
    ctx: tractor.Context,
    shm_key: str,
    write_eventfd: int,
    wrap_eventfd: int,
    flags: int = 0
):
    async with RingBuffReceiver(
        shm_key,
        write_eventfd,
        wrap_eventfd,
        flags=flags
    ) as receiver:
        await ctx.started()
        await receiver.receive_some()


def test_ring_reader_cancel():
    write_eventfd = open_eventfd()
    wrap_eventfd = open_eventfd()

    proc_kwargs = {
        'pass_fds': (write_eventfd, wrap_eventfd)
    }

    shm_key = 'test_ring_cancel'

    async def main():
        async with (
            tractor.open_nursery() as an,
            RingBuffSender(
                shm_key,
                write_eventfd,
                wrap_eventfd,
            ) as _sender,
        ):
            recv_p = await an.start_actor(
                'ring_blocked_receiver',
                enable_modules=[__name__],
                proc_kwargs=proc_kwargs
            )
            async with (
                recv_p.open_context(
                    child_blocked_receiver,
                    write_eventfd=write_eventfd,
                    wrap_eventfd=wrap_eventfd,
                    shm_key=shm_key,
                ) as (sctx, _sent),
            ):
                await trio.sleep(1)
                await an.cancel()


    with pytest.raises(tractor._exceptions.ContextCancelled):
        trio.run(main)