Add buf_size to RBToken and add sender cancel test, move disable_mantracker to its own _mp_bs module

Guillermo Rodriguez 2025-03-14 00:25:10 -03:00
parent dd17aa4205
commit ea27934eb9
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
4 changed files with 105 additions and 51 deletions

View File

@ -17,16 +17,12 @@ async def child_read_shm(
ctx: tractor.Context, ctx: tractor.Context,
msg_amount: int, msg_amount: int,
token: RBToken, token: RBToken,
buf_size: int,
total_bytes: int, total_bytes: int,
) -> None: ) -> None:
recvd_bytes = 0 recvd_bytes = 0
await ctx.started() await ctx.started()
start_ts = time.time() start_ts = time.time()
async with RingBuffReceiver( async with RingBuffReceiver(token) as receiver:
token,
buf_size=buf_size,
) as receiver:
while recvd_bytes < total_bytes: while recvd_bytes < total_bytes:
msg = await receiver.receive_some() msg = await receiver.receive_some()
recvd_bytes += len(msg) recvd_bytes += len(msg)
@ -51,7 +47,6 @@ async def child_write_shm(
rand_min: int, rand_min: int,
rand_max: int, rand_max: int,
token: RBToken, token: RBToken,
buf_size: int,
) -> None: ) -> None:
msgs, total_bytes = generate_sample_messages( msgs, total_bytes = generate_sample_messages(
msg_amount, msg_amount,
@ -59,10 +54,7 @@ async def child_write_shm(
rand_max=rand_max, rand_max=rand_max,
) )
await ctx.started(total_bytes) await ctx.started(total_bytes)
async with RingBuffSender( async with RingBuffSender(token) as sender:
token,
buf_size=buf_size
) as sender:
for msg in msgs: for msg in msgs:
await sender.send_all(msg) await sender.send_all(msg)
@ -103,7 +95,6 @@ def test_ringbuf(
common_kwargs = { common_kwargs = {
'msg_amount': msg_amount, 'msg_amount': msg_amount,
'token': token, 'token': token,
'buf_size': buf_size
} }
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
send_p = await an.start_actor( send_p = await an.start_actor(
@ -150,7 +141,7 @@ async def child_blocked_receiver(
def test_ring_reader_cancel(): def test_ring_reader_cancel():
async def main(): async def main():
with open_ringbuf('test_ring_cancel') as token: with open_ringbuf('test_ring_cancel_reader') as token:
async with ( async with (
tractor.open_nursery() as an, tractor.open_nursery() as an,
RingBuffSender(token) as _sender, RingBuffSender(token) as _sender,
@ -174,3 +165,41 @@ def test_ring_reader_cancel():
with pytest.raises(tractor._exceptions.ContextCancelled): with pytest.raises(tractor._exceptions.ContextCancelled):
trio.run(main) trio.run(main)
@tractor.context
async def child_blocked_sender(
ctx: tractor.Context,
token: RBToken
):
async with RingBuffSender(token) as sender:
await ctx.started()
await sender.send_all(b'this will wrap')
def test_ring_sender_cancel():
async def main():
with open_ringbuf(
'test_ring_cancel_sender',
buf_size=1
) as token:
async with tractor.open_nursery() as an:
recv_p = await an.start_actor(
'ring_blocked_sender',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
}
)
async with (
recv_p.open_context(
child_blocked_sender,
token=token
) as (sctx, _sent),
):
await trio.sleep(1)
await an.cancel()
with pytest.raises(tractor._exceptions.ContextCancelled):
trio.run(main)

View File

@ -0,0 +1,45 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Utils to tame mp non-SC madeness
'''
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd

View File

@ -32,6 +32,10 @@ from ._linux import (
open_eventfd, open_eventfd,
EventFD EventFD
) )
from ._mp_bs import disable_mantracker
disable_mantracker()
class RBToken(Struct, frozen=True): class RBToken(Struct, frozen=True):
@ -43,6 +47,7 @@ class RBToken(Struct, frozen=True):
shm_name: str shm_name: str
write_eventfd: int write_eventfd: int
wrap_eventfd: int wrap_eventfd: int
buf_size: int
def as_msg(self): def as_msg(self):
return to_builtins(self) return to_builtins(self)
@ -67,13 +72,17 @@ def open_ringbuf(
size=buf_size, size=buf_size,
create=True create=True
) )
token = RBToken( try:
shm_name=shm_name, token = RBToken(
write_eventfd=open_eventfd(flags=write_efd_flags), shm_name=shm_name,
wrap_eventfd=open_eventfd(flags=wrap_efd_flags) write_eventfd=open_eventfd(flags=write_efd_flags),
) wrap_eventfd=open_eventfd(flags=wrap_efd_flags),
yield token buf_size=buf_size
shm.close() )
yield token
finally:
shm.unlink()
class RingBuffSender(trio.abc.SendStream): class RingBuffSender(trio.abc.SendStream):
@ -88,12 +97,11 @@ class RingBuffSender(trio.abc.SendStream):
self, self,
token: RBToken, token: RBToken,
start_ptr: int = 0, start_ptr: int = 0,
buf_size: int = 10 * 1024,
): ):
token = RBToken.from_msg(token) token = RBToken.from_msg(token)
self._shm = SharedMemory( self._shm = SharedMemory(
name=token.shm_name, name=token.shm_name,
size=buf_size, size=token.buf_size,
create=False create=False
) )
self._write_event = EventFD(token.write_eventfd, 'w') self._write_event = EventFD(token.write_eventfd, 'w')
@ -167,13 +175,12 @@ class RingBuffReceiver(trio.abc.ReceiveStream):
self, self,
token: RBToken, token: RBToken,
start_ptr: int = 0, start_ptr: int = 0,
buf_size: int = 10 * 1024,
flags: int = 0 flags: int = 0
): ):
token = RBToken.from_msg(token) token = RBToken.from_msg(token)
self._shm = SharedMemory( self._shm = SharedMemory(
name=token.shm_name, name=token.shm_name,
size=buf_size, size=token.buf_size,
create=False create=False
) )
self._write_event = EventFD(token.write_eventfd, 'w') self._write_event = EventFD(token.write_eventfd, 'w')

View File

@ -38,6 +38,7 @@ from msgspec import (
) )
import tractor import tractor
from tractor.ipc._mp_bs import disable_mantracker
from tractor.log import get_logger from tractor.log import get_logger
@ -57,34 +58,6 @@ except ImportError:
log = get_logger(__name__) log = get_logger(__name__)
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
disable_mantracker() disable_mantracker()