Compare commits
1 Commits
3127db8502
...
dd17aa4205
Author | SHA1 | Date |
---|---|---|
|
dd17aa4205 |
|
@ -92,8 +92,10 @@ def test_ringbuf(
|
|||
buf_size: int
|
||||
):
|
||||
async def main():
|
||||
|
||||
with open_ringbuf('test_ringbuf') as token:
|
||||
with open_ringbuf(
|
||||
'test_ringbuf',
|
||||
buf_size=buf_size
|
||||
) as token:
|
||||
proc_kwargs = {
|
||||
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
|
||||
}
|
||||
|
|
|
@ -58,15 +58,22 @@ class RBToken(Struct, frozen=True):
|
|||
@cm
|
||||
def open_ringbuf(
|
||||
shm_name: str,
|
||||
buf_size: int = 10 * 1024,
|
||||
write_efd_flags: int = 0,
|
||||
wrap_efd_flags: int = 0
|
||||
) -> RBToken:
|
||||
shm = SharedMemory(
|
||||
name=shm_name,
|
||||
size=buf_size,
|
||||
create=True
|
||||
)
|
||||
token = RBToken(
|
||||
shm_name=shm_name,
|
||||
write_eventfd=open_eventfd(flags=write_efd_flags),
|
||||
wrap_eventfd=open_eventfd(flags=wrap_efd_flags)
|
||||
)
|
||||
yield token
|
||||
shm.close()
|
||||
|
||||
|
||||
class RingBuffSender(trio.abc.SendStream):
|
||||
|
@ -82,18 +89,16 @@ class RingBuffSender(trio.abc.SendStream):
|
|||
token: RBToken,
|
||||
start_ptr: int = 0,
|
||||
buf_size: int = 10 * 1024,
|
||||
unlink_on_exit: bool = False
|
||||
):
|
||||
token = RBToken.from_msg(token)
|
||||
self._shm = SharedMemory(
|
||||
name=token.shm_name,
|
||||
size=buf_size,
|
||||
create=True
|
||||
create=False
|
||||
)
|
||||
self._write_event = EventFD(token.write_eventfd, 'w')
|
||||
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
|
||||
self._ptr = start_ptr
|
||||
self.unlink_on_exit = unlink_on_exit
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
|
@ -142,11 +147,7 @@ class RingBuffSender(trio.abc.SendStream):
|
|||
async def aclose(self):
|
||||
self._write_event.close()
|
||||
self._wrap_event.close()
|
||||
if self.unlink_on_exit:
|
||||
self._shm.unlink()
|
||||
|
||||
else:
|
||||
self._shm.close()
|
||||
self._shm.close()
|
||||
|
||||
async def __aenter__(self):
|
||||
self._write_event.open()
|
||||
|
|
Loading…
Reference in New Issue