Add ringd test, which also tests fd share
							parent
							
								
									d01ad12b3b
								
							
						
					
					
						commit
						739e728065
					
				| 
						 | 
				
			
			@ -0,0 +1,81 @@
 | 
			
		|||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
 | 
			
		||||
from tractor.ipc import (
 | 
			
		||||
    attach_to_ringbuf_rchannel,
 | 
			
		||||
    attach_to_ringbuf_schannel
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import tractor.ipc._ringbuf._ringd as ringd
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = tractor.log.get_console_log(level='info')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def recv_child(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    ring_name: str
 | 
			
		||||
):
 | 
			
		||||
    async with (
 | 
			
		||||
        ringd.open_ringbuf(ring_name) as token,
 | 
			
		||||
 | 
			
		||||
        attach_to_ringbuf_rchannel(token) as chan,
 | 
			
		||||
    ):
 | 
			
		||||
        await ctx.started()
 | 
			
		||||
        async for msg in chan:
 | 
			
		||||
            log.info(f'received {int.from_bytes(msg)}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def send_child(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    ring_name: str
 | 
			
		||||
):
 | 
			
		||||
    async with (
 | 
			
		||||
        ringd.open_ringbuf(ring_name) as token,
 | 
			
		||||
 | 
			
		||||
        attach_to_ringbuf_schannel(token) as chan,
 | 
			
		||||
    ):
 | 
			
		||||
        await ctx.started()
 | 
			
		||||
        for i in range(100):
 | 
			
		||||
            await chan.send(i.to_bytes(4))
 | 
			
		||||
            log.info(f'sent {i}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_ringd():
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with (
 | 
			
		||||
            tractor.open_nursery() as an,
 | 
			
		||||
 | 
			
		||||
            ringd.open_ringd(
 | 
			
		||||
                loglevel='info'
 | 
			
		||||
            )
 | 
			
		||||
        ):
 | 
			
		||||
            recv_portal = await an.start_actor(
 | 
			
		||||
                'recv',
 | 
			
		||||
                enable_modules=[__name__]
 | 
			
		||||
            )
 | 
			
		||||
            send_portal = await an.start_actor(
 | 
			
		||||
                'send',
 | 
			
		||||
                enable_modules=[__name__]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                recv_portal.open_context(
 | 
			
		||||
                    recv_child,
 | 
			
		||||
                    ring_name='ring'
 | 
			
		||||
                ) as (rctx, _),
 | 
			
		||||
 | 
			
		||||
                send_portal.open_context(
 | 
			
		||||
                    send_child,
 | 
			
		||||
                    ring_name='ring'
 | 
			
		||||
                ) as (sctx, _),
 | 
			
		||||
            ):
 | 
			
		||||
                await rctx.wait_for_result()
 | 
			
		||||
                await sctx.wait_for_result()
 | 
			
		||||
 | 
			
		||||
            await an.cancel()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
		Loading…
	
		Reference in New Issue