Compare commits
1 Commits
3127db8502
...
dd17aa4205
Author | SHA1 | Date |
---|---|---|
|
dd17aa4205 |
|
@ -92,8 +92,10 @@ def test_ringbuf(
|
||||||
buf_size: int
|
buf_size: int
|
||||||
):
|
):
|
||||||
async def main():
|
async def main():
|
||||||
|
with open_ringbuf(
|
||||||
with open_ringbuf('test_ringbuf') as token:
|
'test_ringbuf',
|
||||||
|
buf_size=buf_size
|
||||||
|
) as token:
|
||||||
proc_kwargs = {
|
proc_kwargs = {
|
||||||
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
|
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,15 +58,22 @@ class RBToken(Struct, frozen=True):
|
||||||
@cm
|
@cm
|
||||||
def open_ringbuf(
|
def open_ringbuf(
|
||||||
shm_name: str,
|
shm_name: str,
|
||||||
|
buf_size: int = 10 * 1024,
|
||||||
write_efd_flags: int = 0,
|
write_efd_flags: int = 0,
|
||||||
wrap_efd_flags: int = 0
|
wrap_efd_flags: int = 0
|
||||||
) -> RBToken:
|
) -> RBToken:
|
||||||
|
shm = SharedMemory(
|
||||||
|
name=shm_name,
|
||||||
|
size=buf_size,
|
||||||
|
create=True
|
||||||
|
)
|
||||||
token = RBToken(
|
token = RBToken(
|
||||||
shm_name=shm_name,
|
shm_name=shm_name,
|
||||||
write_eventfd=open_eventfd(flags=write_efd_flags),
|
write_eventfd=open_eventfd(flags=write_efd_flags),
|
||||||
wrap_eventfd=open_eventfd(flags=wrap_efd_flags)
|
wrap_eventfd=open_eventfd(flags=wrap_efd_flags)
|
||||||
)
|
)
|
||||||
yield token
|
yield token
|
||||||
|
shm.close()
|
||||||
|
|
||||||
|
|
||||||
class RingBuffSender(trio.abc.SendStream):
|
class RingBuffSender(trio.abc.SendStream):
|
||||||
|
@ -82,18 +89,16 @@ class RingBuffSender(trio.abc.SendStream):
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
start_ptr: int = 0,
|
start_ptr: int = 0,
|
||||||
buf_size: int = 10 * 1024,
|
buf_size: int = 10 * 1024,
|
||||||
unlink_on_exit: bool = False
|
|
||||||
):
|
):
|
||||||
token = RBToken.from_msg(token)
|
token = RBToken.from_msg(token)
|
||||||
self._shm = SharedMemory(
|
self._shm = SharedMemory(
|
||||||
name=token.shm_name,
|
name=token.shm_name,
|
||||||
size=buf_size,
|
size=buf_size,
|
||||||
create=True
|
create=False
|
||||||
)
|
)
|
||||||
self._write_event = EventFD(token.write_eventfd, 'w')
|
self._write_event = EventFD(token.write_eventfd, 'w')
|
||||||
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
|
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
|
||||||
self._ptr = start_ptr
|
self._ptr = start_ptr
|
||||||
self.unlink_on_exit = unlink_on_exit
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def key(self) -> str:
|
def key(self) -> str:
|
||||||
|
@ -142,11 +147,7 @@ class RingBuffSender(trio.abc.SendStream):
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
self._write_event.close()
|
self._write_event.close()
|
||||||
self._wrap_event.close()
|
self._wrap_event.close()
|
||||||
if self.unlink_on_exit:
|
self._shm.close()
|
||||||
self._shm.unlink()
|
|
||||||
|
|
||||||
else:
|
|
||||||
self._shm.close()
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
self._write_event.open()
|
self._write_event.open()
|
||||||
|
|
Loading…
Reference in New Issue