Add direct ctx managers for RB channels

Guillermo Rodriguez 2025-03-18 13:47:41 -03:00
parent 2901049b5b
commit 5902ad2ffb
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
2 changed files with 36 additions and 7 deletions

View File

@ -55,5 +55,7 @@ if platform.system() == 'Linux':
RingBuffBytesSender as RingBuffBytesSender, RingBuffBytesSender as RingBuffBytesSender,
RingBuffBytesReceiver as RingBuffBytesReceiver, RingBuffBytesReceiver as RingBuffBytesReceiver,
RingBuffChannel as RingBuffChannel, RingBuffChannel as RingBuffChannel,
attach_to_ringbuf_channel as attach_to_ringbuf_channel attach_to_ringbuf_schannel as attach_to_ringbuf_schannel,
attach_to_ringbuf_rchannel as attach_to_ringbuf_rchannel,
attach_to_ringbuf_channel as attach_to_ringbuf_channel,
) )

View File

@ -550,6 +550,36 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
await self._receiver.aclose() await self._receiver.aclose()
@acm
async def attach_to_ringbuf_rchannel(
token: RBToken,
cleanup: bool = True
):
'''
Attach a RingBuffBytesReceiver from a previously opened
RBToken.
'''
async with attach_to_ringbuf_receiver(
token, cleanup=cleanup
) as receiver:
yield RingBuffBytesReceiver(receiver)
@acm
async def attach_to_ringbuf_schannel(
token: RBToken,
cleanup: bool = True
):
'''
Attach a RingBuffBytesSender from a previously opened
RBToken.
'''
async with attach_to_ringbuf_sender(
token, cleanup=cleanup
) as sender:
yield RingBuffBytesSender(sender)
class RingBuffChannel(trio.abc.Channel[bytes]): class RingBuffChannel(trio.abc.Channel[bytes]):
''' '''
Combine `RingBuffBytesSender` and `RingBuffBytesReceiver` Combine `RingBuffBytesSender` and `RingBuffBytesReceiver`
@ -588,16 +618,13 @@ async def attach_to_ringbuf_channel(
''' '''
async with ( async with (
attach_to_ringbuf_receiver( attach_to_ringbuf_rchannel(
token_in, token_in,
cleanup=cleanup_in cleanup=cleanup_in
) as receiver, ) as receiver,
attach_to_ringbuf_sender( attach_to_ringbuf_schannel(
token_out, token_out,
cleanup=cleanup_out cleanup=cleanup_out
) as sender, ) as sender,
): ):
yield RingBuffChannel( yield RingBuffChannel(sender, receiver)
RingBuffBytesSender(sender),
RingBuffBytesReceiver(receiver)
)