diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index 64fb37e9..28af7b83 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -17,16 +17,12 @@ async def child_read_shm( ctx: tractor.Context, msg_amount: int, token: RBToken, - buf_size: int, total_bytes: int, ) -> None: recvd_bytes = 0 await ctx.started() start_ts = time.time() - async with RingBuffReceiver( - token, - buf_size=buf_size, - ) as receiver: + async with RingBuffReceiver(token) as receiver: while recvd_bytes < total_bytes: msg = await receiver.receive_some() recvd_bytes += len(msg) @@ -51,7 +47,6 @@ async def child_write_shm( rand_min: int, rand_max: int, token: RBToken, - buf_size: int, ) -> None: msgs, total_bytes = generate_sample_messages( msg_amount, @@ -59,10 +54,7 @@ async def child_write_shm( rand_max=rand_max, ) await ctx.started(total_bytes) - async with RingBuffSender( - token, - buf_size=buf_size - ) as sender: + async with RingBuffSender(token) as sender: for msg in msgs: await sender.send_all(msg) @@ -103,7 +95,6 @@ def test_ringbuf( common_kwargs = { 'msg_amount': msg_amount, 'token': token, - 'buf_size': buf_size } async with tractor.open_nursery() as an: send_p = await an.start_actor( @@ -150,7 +141,7 @@ async def child_blocked_receiver( def test_ring_reader_cancel(): async def main(): - with open_ringbuf('test_ring_cancel') as token: + with open_ringbuf('test_ring_cancel_reader') as token: async with ( tractor.open_nursery() as an, RingBuffSender(token) as _sender, @@ -174,3 +165,41 @@ def test_ring_reader_cancel(): with pytest.raises(tractor._exceptions.ContextCancelled): trio.run(main) + + +@tractor.context +async def child_blocked_sender( + ctx: tractor.Context, + token: RBToken +): + async with RingBuffSender(token) as sender: + await ctx.started() + await sender.send_all(b'this will wrap') + + +def test_ring_sender_cancel(): + async def main(): + with open_ringbuf( + 'test_ring_cancel_sender', + buf_size=1 + ) as token: + async with tractor.open_nursery() as an: + recv_p = await an.start_actor( + 'ring_blocked_sender', + enable_modules=[__name__], + proc_kwargs={ + 'pass_fds': (token.write_eventfd, token.wrap_eventfd) + } + ) + async with ( + recv_p.open_context( + child_blocked_sender, + token=token + ) as (sctx, _sent), + ): + await trio.sleep(1) + await an.cancel() + + + with pytest.raises(tractor._exceptions.ContextCancelled): + trio.run(main) diff --git a/tractor/ipc/_mp_bs.py b/tractor/ipc/_mp_bs.py new file mode 100644 index 00000000..e51aa9ae --- /dev/null +++ b/tractor/ipc/_mp_bs.py @@ -0,0 +1,45 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Utils to tame mp non-SC madeness + +''' +def disable_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + + ''' + from multiprocessing import resource_tracker as mantracker + + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py index c590e8e2..6337eea1 100644 --- a/tractor/ipc/_ringbuf.py +++ b/tractor/ipc/_ringbuf.py @@ -32,6 +32,10 @@ from ._linux import ( open_eventfd, EventFD ) +from ._mp_bs import disable_mantracker + + +disable_mantracker() class RBToken(Struct, frozen=True): @@ -43,6 +47,7 @@ class RBToken(Struct, frozen=True): shm_name: str write_eventfd: int wrap_eventfd: int + buf_size: int def as_msg(self): return to_builtins(self) @@ -67,13 +72,17 @@ def open_ringbuf( size=buf_size, create=True ) - token = RBToken( - shm_name=shm_name, - write_eventfd=open_eventfd(flags=write_efd_flags), - wrap_eventfd=open_eventfd(flags=wrap_efd_flags) - ) - yield token - shm.close() + try: + token = RBToken( + shm_name=shm_name, + write_eventfd=open_eventfd(flags=write_efd_flags), + wrap_eventfd=open_eventfd(flags=wrap_efd_flags), + buf_size=buf_size + ) + yield token + + finally: + shm.unlink() class RingBuffSender(trio.abc.SendStream): @@ -88,12 +97,11 @@ class RingBuffSender(trio.abc.SendStream): self, token: RBToken, start_ptr: int = 0, - buf_size: int = 10 * 1024, ): token = RBToken.from_msg(token) self._shm = SharedMemory( name=token.shm_name, - size=buf_size, + size=token.buf_size, create=False ) self._write_event = EventFD(token.write_eventfd, 'w') @@ -167,13 +175,12 @@ class RingBuffReceiver(trio.abc.ReceiveStream): self, token: RBToken, start_ptr: int = 0, - buf_size: int = 10 * 1024, flags: int = 0 ): token = RBToken.from_msg(token) self._shm = SharedMemory( name=token.shm_name, - size=buf_size, + size=token.buf_size, create=False ) self._write_event = EventFD(token.write_eventfd, 'w') diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py index 2e14f4da..7d2897d6 100644 --- a/tractor/ipc/_shm.py +++ b/tractor/ipc/_shm.py @@ -38,6 +38,7 @@ from msgspec import ( ) import tractor +from tractor.ipc._mp_bs import disable_mantracker from tractor.log import get_logger @@ -57,34 +58,6 @@ except ImportError: log = get_logger(__name__) -def disable_mantracker(): - ''' - Disable all ``multiprocessing``` "resource tracking" machinery since - it's an absolute multi-threaded mess of non-SC madness. - - ''' - from multiprocessing import resource_tracker as mantracker - - # Tell the "resource tracker" thing to fuck off. - class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass - - def unregister(self, name, rtype): - pass - - def ensure_running(self): - pass - - # "know your land and know your prey" - # https://www.dailymotion.com/video/x6ozzco - mantracker._resource_tracker = ManTracker() - mantracker.register = mantracker._resource_tracker.register - mantracker.ensure_running = mantracker._resource_tracker.ensure_running - mantracker.unregister = mantracker._resource_tracker.unregister - mantracker.getfd = mantracker._resource_tracker.getfd - - disable_mantracker()