Compare commits

..

1 Commits

Author SHA1 Message Date
Guillermo Rodriguez dd17aa4205
Make ring buf api use pickle-able RBToken 2025-03-13 23:41:53 -03:00
2 changed files with 13 additions and 10 deletions

View File

@ -92,8 +92,10 @@ def test_ringbuf(
buf_size: int
):
async def main():
with open_ringbuf('test_ringbuf') as token:
with open_ringbuf(
'test_ringbuf',
buf_size=buf_size
) as token:
proc_kwargs = {
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
}

View File

@ -58,15 +58,22 @@ class RBToken(Struct, frozen=True):
@cm
def open_ringbuf(
shm_name: str,
buf_size: int = 10 * 1024,
write_efd_flags: int = 0,
wrap_efd_flags: int = 0
) -> RBToken:
shm = SharedMemory(
name=shm_name,
size=buf_size,
create=True
)
token = RBToken(
shm_name=shm_name,
write_eventfd=open_eventfd(flags=write_efd_flags),
wrap_eventfd=open_eventfd(flags=wrap_efd_flags)
)
yield token
shm.close()
class RingBuffSender(trio.abc.SendStream):
@ -82,18 +89,16 @@ class RingBuffSender(trio.abc.SendStream):
token: RBToken,
start_ptr: int = 0,
buf_size: int = 10 * 1024,
unlink_on_exit: bool = False
):
token = RBToken.from_msg(token)
self._shm = SharedMemory(
name=token.shm_name,
size=buf_size,
create=True
create=False
)
self._write_event = EventFD(token.write_eventfd, 'w')
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
self._ptr = start_ptr
self.unlink_on_exit = unlink_on_exit
@property
def key(self) -> str:
@ -142,11 +147,7 @@ class RingBuffSender(trio.abc.SendStream):
async def aclose(self):
self._write_event.close()
self._wrap_event.close()
if self.unlink_on_exit:
self._shm.unlink()
else:
self._shm.close()
self._shm.close()
async def __aenter__(self):
self._write_event.open()