Add buf_size to RBToken and add sender cancel test, move disable_mantracker to its own _mp_bs module
							parent
							
								
									8adde29e39
								
							
						
					
					
						commit
						de45ad8829
					
				|  | @ -17,16 +17,12 @@ async def child_read_shm( | |||
|     ctx: tractor.Context, | ||||
|     msg_amount: int, | ||||
|     token: RBToken, | ||||
|     buf_size: int, | ||||
|     total_bytes: int, | ||||
| ) -> None: | ||||
|     recvd_bytes = 0 | ||||
|     await ctx.started() | ||||
|     start_ts = time.time() | ||||
|     async with RingBuffReceiver( | ||||
|         token, | ||||
|         buf_size=buf_size, | ||||
|     ) as receiver: | ||||
|     async with RingBuffReceiver(token) as receiver: | ||||
|         while recvd_bytes < total_bytes: | ||||
|             msg = await receiver.receive_some() | ||||
|             recvd_bytes += len(msg) | ||||
|  | @ -51,7 +47,6 @@ async def child_write_shm( | |||
|     rand_min: int, | ||||
|     rand_max: int, | ||||
|     token: RBToken, | ||||
|     buf_size: int, | ||||
| ) -> None: | ||||
|     msgs, total_bytes = generate_sample_messages( | ||||
|         msg_amount, | ||||
|  | @ -59,10 +54,7 @@ async def child_write_shm( | |||
|         rand_max=rand_max, | ||||
|     ) | ||||
|     await ctx.started(total_bytes) | ||||
|     async with RingBuffSender( | ||||
|         token, | ||||
|         buf_size=buf_size | ||||
|     ) as sender: | ||||
|     async with RingBuffSender(token) as sender: | ||||
|         for msg in msgs: | ||||
|             await sender.send_all(msg) | ||||
| 
 | ||||
|  | @ -103,7 +95,6 @@ def test_ringbuf( | |||
|             common_kwargs = { | ||||
|                 'msg_amount': msg_amount, | ||||
|                 'token': token, | ||||
|                 'buf_size': buf_size | ||||
|             } | ||||
|             async with tractor.open_nursery() as an: | ||||
|                 send_p = await an.start_actor( | ||||
|  | @ -150,7 +141,7 @@ async def child_blocked_receiver( | |||
| 
 | ||||
| def test_ring_reader_cancel(): | ||||
|     async def main(): | ||||
|         with open_ringbuf('test_ring_cancel') as token: | ||||
|         with open_ringbuf('test_ring_cancel_reader') as token: | ||||
|             async with ( | ||||
|                 tractor.open_nursery() as an, | ||||
|                 RingBuffSender(token) as _sender, | ||||
|  | @ -174,3 +165,41 @@ def test_ring_reader_cancel(): | |||
| 
 | ||||
|     with pytest.raises(tractor._exceptions.ContextCancelled): | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def child_blocked_sender( | ||||
|     ctx: tractor.Context, | ||||
|     token: RBToken | ||||
| ): | ||||
|     async with RingBuffSender(token) as sender: | ||||
|         await ctx.started() | ||||
|         await sender.send_all(b'this will wrap') | ||||
| 
 | ||||
| 
 | ||||
| def test_ring_sender_cancel(): | ||||
|     async def main(): | ||||
|         with open_ringbuf( | ||||
|             'test_ring_cancel_sender', | ||||
|             buf_size=1 | ||||
|         ) as token: | ||||
|             async with tractor.open_nursery() as an: | ||||
|                 recv_p = await an.start_actor( | ||||
|                     'ring_blocked_sender', | ||||
|                     enable_modules=[__name__], | ||||
|                     proc_kwargs={ | ||||
|                         'pass_fds': (token.write_eventfd, token.wrap_eventfd) | ||||
|                     } | ||||
|                 ) | ||||
|                 async with ( | ||||
|                     recv_p.open_context( | ||||
|                         child_blocked_sender, | ||||
|                         token=token | ||||
|                     ) as (sctx, _sent), | ||||
|                 ): | ||||
|                     await trio.sleep(1) | ||||
|                     await an.cancel() | ||||
| 
 | ||||
| 
 | ||||
|     with pytest.raises(tractor._exceptions.ContextCancelled): | ||||
|         trio.run(main) | ||||
|  |  | |||
|  | @ -0,0 +1,45 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2018-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| ''' | ||||
| Utils to tame mp non-SC madeness | ||||
| 
 | ||||
| ''' | ||||
| def disable_mantracker(): | ||||
|     ''' | ||||
|     Disable all ``multiprocessing``` "resource tracking" machinery since | ||||
|     it's an absolute multi-threaded mess of non-SC madness. | ||||
| 
 | ||||
|     ''' | ||||
|     from multiprocessing import resource_tracker as mantracker | ||||
| 
 | ||||
|     # Tell the "resource tracker" thing to fuck off. | ||||
|     class ManTracker(mantracker.ResourceTracker): | ||||
|         def register(self, name, rtype): | ||||
|             pass | ||||
| 
 | ||||
|         def unregister(self, name, rtype): | ||||
|             pass | ||||
| 
 | ||||
|         def ensure_running(self): | ||||
|             pass | ||||
| 
 | ||||
|     # "know your land and know your prey" | ||||
|     # https://www.dailymotion.com/video/x6ozzco | ||||
|     mantracker._resource_tracker = ManTracker() | ||||
|     mantracker.register = mantracker._resource_tracker.register | ||||
|     mantracker.ensure_running = mantracker._resource_tracker.ensure_running | ||||
|     mantracker.unregister = mantracker._resource_tracker.unregister | ||||
|     mantracker.getfd = mantracker._resource_tracker.getfd | ||||
|  | @ -32,6 +32,10 @@ from ._linux import ( | |||
|     open_eventfd, | ||||
|     EventFD | ||||
| ) | ||||
| from ._mp_bs import disable_mantracker | ||||
| 
 | ||||
| 
 | ||||
| disable_mantracker() | ||||
| 
 | ||||
| 
 | ||||
| class RBToken(Struct, frozen=True): | ||||
|  | @ -43,6 +47,7 @@ class RBToken(Struct, frozen=True): | |||
|     shm_name: str | ||||
|     write_eventfd: int | ||||
|     wrap_eventfd: int | ||||
|     buf_size: int | ||||
| 
 | ||||
|     def as_msg(self): | ||||
|         return to_builtins(self) | ||||
|  | @ -67,13 +72,17 @@ def open_ringbuf( | |||
|         size=buf_size, | ||||
|         create=True | ||||
|     ) | ||||
|     token = RBToken( | ||||
|         shm_name=shm_name, | ||||
|         write_eventfd=open_eventfd(flags=write_efd_flags), | ||||
|         wrap_eventfd=open_eventfd(flags=wrap_efd_flags) | ||||
|     ) | ||||
|     yield token | ||||
|     shm.close() | ||||
|     try: | ||||
|         token = RBToken( | ||||
|             shm_name=shm_name, | ||||
|             write_eventfd=open_eventfd(flags=write_efd_flags), | ||||
|             wrap_eventfd=open_eventfd(flags=wrap_efd_flags), | ||||
|             buf_size=buf_size | ||||
|         ) | ||||
|         yield token | ||||
| 
 | ||||
|     finally: | ||||
|         shm.unlink() | ||||
| 
 | ||||
| 
 | ||||
| class RingBuffSender(trio.abc.SendStream): | ||||
|  | @ -88,12 +97,11 @@ class RingBuffSender(trio.abc.SendStream): | |||
|         self, | ||||
|         token: RBToken, | ||||
|         start_ptr: int = 0, | ||||
|         buf_size: int = 10 * 1024, | ||||
|     ): | ||||
|         token = RBToken.from_msg(token) | ||||
|         self._shm = SharedMemory( | ||||
|             name=token.shm_name, | ||||
|             size=buf_size, | ||||
|             size=token.buf_size, | ||||
|             create=False | ||||
|         ) | ||||
|         self._write_event = EventFD(token.write_eventfd, 'w') | ||||
|  | @ -167,13 +175,12 @@ class RingBuffReceiver(trio.abc.ReceiveStream): | |||
|         self, | ||||
|         token: RBToken, | ||||
|         start_ptr: int = 0, | ||||
|         buf_size: int = 10 * 1024, | ||||
|         flags: int = 0 | ||||
|     ): | ||||
|         token = RBToken.from_msg(token) | ||||
|         self._shm = SharedMemory( | ||||
|             name=token.shm_name, | ||||
|             size=buf_size, | ||||
|             size=token.buf_size, | ||||
|             create=False | ||||
|         ) | ||||
|         self._write_event = EventFD(token.write_eventfd, 'w') | ||||
|  |  | |||
|  | @ -38,6 +38,7 @@ from msgspec import ( | |||
| ) | ||||
| import tractor | ||||
| 
 | ||||
| from tractor.ipc._mp_bs import disable_mantracker | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| 
 | ||||
|  | @ -57,34 +58,6 @@ except ImportError: | |||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def disable_mantracker(): | ||||
|     ''' | ||||
|     Disable all ``multiprocessing``` "resource tracking" machinery since | ||||
|     it's an absolute multi-threaded mess of non-SC madness. | ||||
| 
 | ||||
|     ''' | ||||
|     from multiprocessing import resource_tracker as mantracker | ||||
| 
 | ||||
|     # Tell the "resource tracker" thing to fuck off. | ||||
|     class ManTracker(mantracker.ResourceTracker): | ||||
|         def register(self, name, rtype): | ||||
|             pass | ||||
| 
 | ||||
|         def unregister(self, name, rtype): | ||||
|             pass | ||||
| 
 | ||||
|         def ensure_running(self): | ||||
|             pass | ||||
| 
 | ||||
|     # "know your land and know your prey" | ||||
|     # https://www.dailymotion.com/video/x6ozzco | ||||
|     mantracker._resource_tracker = ManTracker() | ||||
|     mantracker.register = mantracker._resource_tracker.register | ||||
|     mantracker.ensure_running = mantracker._resource_tracker.ensure_running | ||||
|     mantracker.unregister = mantracker._resource_tracker.unregister | ||||
|     mantracker.getfd = mantracker._resource_tracker.getfd | ||||
| 
 | ||||
| 
 | ||||
| disable_mantracker() | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue