Add owner semantics to RBToken

Stop exporting `_ringbuf` on `tractor.ipc`
Use absolute imports on `_ringbuf` module
Add more comments and acm helpers for ringbuf allocation functions
Create generic FD sharing actor module in `tractor.linux._fdshare`
Include original allocator actor name as `owner` in RBToken
Auto share FDs of allocated ringbufs
On `attach_ringbuf_*` functions request fds from owner
Adapt all ringbuf tests to new system
one_ring_to_rule_them_all
Guillermo Rodriguez 2025-04-13 13:31:44 -03:00
parent 5d6fa643ba
commit 39dccbdde7
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
5 changed files with 463 additions and 201 deletions

View File

@ -7,6 +7,7 @@ import pytest
import tractor
from tractor.ipc._ringbuf import (
open_ringbuf,
open_ringbuf_pair,
attach_to_ringbuf_receiver,
attach_to_ringbuf_sender,
attach_to_ringbuf_channel,
@ -68,7 +69,7 @@ async def child_write_shm(
msg_amount: int,
rand_min: int,
rand_max: int,
token: RBToken,
buf_size: int
) -> None:
'''
Sub-actor used in `test_ringbuf`
@ -85,9 +86,12 @@ async def child_write_shm(
rand_min=rand_min,
rand_max=rand_max,
)
await ctx.started()
print('writer started')
async with attach_to_ringbuf_sender(token, cleanup=False) as sender:
async with (
open_ringbuf('test_ringbuf', buf_size=buf_size) as token,
attach_to_ringbuf_sender(token) as sender
):
await ctx.started(token)
print('writer started')
for msg in rng:
await sender.send(msg)
@ -133,55 +137,53 @@ def test_ringbuf(
'''
async def main():
with open_ringbuf(
'test_ringbuf',
buf_size=buf_size
) as token:
proc_kwargs = {'pass_fds': token.fds}
async with tractor.open_nursery() as an:
send_p = await an.start_actor(
'ring_sender',
enable_modules=[
__name__,
'tractor.linux._fdshare'
],
)
recv_p = await an.start_actor(
'ring_receiver',
enable_modules=[
__name__,
'tractor.linux._fdshare'
],
)
async with (
send_p.open_context(
child_write_shm,
msg_amount=msg_amount,
rand_min=rand_min,
rand_max=rand_max,
buf_size=buf_size
) as (sctx, token),
async with tractor.open_nursery() as an:
send_p = await an.start_actor(
'ring_sender',
enable_modules=[__name__],
proc_kwargs=proc_kwargs
)
recv_p = await an.start_actor(
'ring_receiver',
enable_modules=[__name__],
proc_kwargs=proc_kwargs
)
async with (
send_p.open_context(
child_write_shm,
token=token,
msg_amount=msg_amount,
rand_min=rand_min,
rand_max=rand_max,
) as (sctx, _),
recv_p.open_context(
child_read_shm,
token=token,
) as (rctx, _),
):
sent_hash = await sctx.result()
recvd_hash = await rctx.result()
recv_p.open_context(
child_read_shm,
token=token,
) as (rctx, _),
):
sent_hash = await sctx.result()
recvd_hash = await rctx.result()
assert sent_hash == recvd_hash
assert sent_hash == recvd_hash
await send_p.cancel_actor()
await recv_p.cancel_actor()
await an.cancel()
trio.run(main)
@tractor.context
async def child_blocked_receiver(
ctx: tractor.Context,
token: RBToken
):
async with attach_to_ringbuf_receiver(token) as receiver:
await ctx.started()
async def child_blocked_receiver(ctx: tractor.Context):
async with (
open_ringbuf('test_ring_cancel_reader') as token,
attach_to_ringbuf_receiver(token) as receiver
):
await ctx.started(token)
await receiver.receive_some()
@ -192,26 +194,23 @@ def test_reader_cancel():
'''
async def main():
with open_ringbuf('test_ring_cancel_reader') as token:
async with tractor.open_nursery() as an:
recv_p = await an.start_actor(
'ring_blocked_receiver',
enable_modules=[
__name__,
'tractor.linux._fdshare'
],
)
async with (
tractor.open_nursery() as an,
attach_to_ringbuf_sender(token) as _sender,
recv_p.open_context(
child_blocked_receiver,
) as (sctx, token),
attach_to_ringbuf_sender(token),
):
recv_p = await an.start_actor(
'ring_blocked_receiver',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': token.fds
}
)
async with (
recv_p.open_context(
child_blocked_receiver,
token=token
) as (sctx, _sent),
):
await trio.sleep(1)
await an.cancel()
await trio.sleep(.1)
await an.cancel()
with pytest.raises(tractor._exceptions.ContextCancelled):
@ -219,12 +218,16 @@ def test_reader_cancel():
@tractor.context
async def child_blocked_sender(
ctx: tractor.Context,
token: RBToken
):
async with attach_to_ringbuf_sender(token) as sender:
await ctx.started()
async def child_blocked_sender(ctx: tractor.Context):
async with (
open_ringbuf(
'test_ring_cancel_sender',
buf_size=1
) as token,
attach_to_ringbuf_sender(token) as sender
):
await ctx.started(token)
await sender.send_all(b'this will wrap')
@ -235,26 +238,23 @@ def test_sender_cancel():
'''
async def main():
with open_ringbuf(
'test_ring_cancel_sender',
buf_size=1
) as token:
async with tractor.open_nursery() as an:
recv_p = await an.start_actor(
'ring_blocked_sender',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': token.fds
}
)
async with (
recv_p.open_context(
child_blocked_sender,
token=token
) as (sctx, _sent),
):
await trio.sleep(1)
await an.cancel()
async with tractor.open_nursery() as an:
recv_p = await an.start_actor(
'ring_blocked_sender',
enable_modules=[
__name__,
'tractor.linux._fdshare'
],
)
async with (
recv_p.open_context(
child_blocked_sender,
) as (sctx, token),
attach_to_ringbuf_receiver(token)
):
await trio.sleep(.1)
await an.cancel()
with pytest.raises(tractor._exceptions.ContextCancelled):
@ -274,24 +274,28 @@ def test_receiver_max_bytes():
msgs = []
async def main():
with open_ringbuf(
'test_ringbuf_max_bytes',
buf_size=10
) as token:
async with (
trio.open_nursery() as n,
attach_to_ringbuf_sender(token, cleanup=False) as sender,
attach_to_ringbuf_receiver(token, cleanup=False) as receiver
):
async def _send_and_close():
await sender.send_all(msg)
await sender.aclose()
async with (
tractor.open_nursery(),
open_ringbuf(
'test_ringbuf_max_bytes',
buf_size=10
) as token,
n.start_soon(_send_and_close)
while len(msgs) < len(msg):
msg_part = await receiver.receive_some(max_bytes=1)
assert len(msg_part) == 1
msgs.append(msg_part)
trio.open_nursery() as n,
attach_to_ringbuf_sender(token, cleanup=False) as sender,
attach_to_ringbuf_receiver(token, cleanup=False) as receiver
):
async def _send_and_close():
await sender.send_all(msg)
await sender.aclose()
n.start_soon(_send_and_close)
while len(msgs) < len(msg):
msg_part = await receiver.receive_some(max_bytes=1)
assert len(msg_part) == 1
msgs.append(msg_part)
trio.run(main)
assert msg == b''.join(msgs)
@ -329,42 +333,46 @@ def test_channel():
msg_amount_min = 100
msg_amount_max = 1000
mods = [
__name__,
'tractor.linux._fdshare'
]
async def main():
with tractor.ipc.open_ringbuf_pair(
'test_ringbuf_transport'
) as (send_token, recv_token):
async with (
tractor.open_nursery(enable_modules=mods) as an,
open_ringbuf_pair(
'test_ringbuf_transport'
) as (send_token, recv_token),
attach_to_ringbuf_channel(send_token, recv_token) as chan,
):
sender = await an.start_actor(
'test_ringbuf_transport_sender',
enable_modules=mods,
)
async with (
attach_to_ringbuf_channel(send_token, recv_token) as chan,
tractor.open_nursery() as an
sender.open_context(
child_channel_sender,
msg_amount_min=msg_amount_min,
msg_amount_max=msg_amount_max,
token_in=recv_token,
token_out=send_token
) as (ctx, _),
):
sender = await an.start_actor(
'test_ringbuf_transport_sender',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': send_token.fds + recv_token.fds
}
)
async with (
sender.open_context(
child_channel_sender,
msg_amount_min=msg_amount_min,
msg_amount_max=msg_amount_max,
token_in=recv_token,
token_out=send_token
) as (ctx, _),
):
recvd_hash = hashlib.sha256()
async for msg in chan:
if msg == b'bye':
await chan.send(b'bye')
break
recvd_hash = hashlib.sha256()
async for msg in chan:
if msg == b'bye':
await chan.send(b'bye')
break
recvd_hash.update(msg)
recvd_hash.update(msg)
sent_hash = await ctx.result()
sent_hash = await ctx.result()
assert recvd_hash.hexdigest() == sent_hash
assert recvd_hash.hexdigest() == sent_hash
await an.cancel()
await an.cancel()
trio.run(main)

View File

@ -17,26 +17,7 @@
A modular IPC layer supporting the power of cross-process SC!
'''
import platform
from ._chan import (
_connect_chan as _connect_chan,
Channel as Channel
)
if platform.system() == 'Linux':
from ._ringbuf import (
RBToken as RBToken,
open_ringbuf as open_ringbuf,
open_ringbuf_pair as open_ringbuf_pair,
RingBufferSendChannel as RingBufferSendChannel,
attach_to_ringbuf_sender as attach_to_ringbuf_sender,
RingBufferReceiveChannel as RingBufferReceiveChannel,
attach_to_ringbuf_receiver as attach_to_ringbuf_receiver,
RingBufferChannel as RingBufferChannel,
attach_to_ringbuf_channel as attach_to_ringbuf_channel,
)

View File

@ -35,16 +35,22 @@ from msgspec import (
to_builtins
)
from ...log import get_logger
from ..._exceptions import (
from tractor.log import get_logger
from tractor._exceptions import (
InternalError
)
from .._mp_bs import disable_mantracker
from ...linux.eventfd import (
from tractor.ipc._mp_bs import disable_mantracker
from tractor.linux._fdshare import (
share_fds,
unshare_fds,
request_fds_from
)
from tractor.linux.eventfd import (
open_eventfd,
EFDReadCancelled,
EventFD
)
from tractor._state import current_actor
log = get_logger(__name__)
@ -57,17 +63,19 @@ _DEFAULT_RB_SIZE = 10 * 1024
class RBToken(Struct, frozen=True):
'''
RingBuffer token contains necesary info to open the three
eventfds and the shared memory
RingBuffer token contains necesary info to open resources of a ringbuf,
even in the case that ringbuf was not allocated by current actor.
'''
owner: str # if owner != `current_actor().name` we must use fdshare
shm_name: str
write_eventfd: int # used to signal writer ptr advance
wrap_eventfd: int # used to signal reader ready after wrap around
eof_eventfd: int # used to signal writer closed
buf_size: int
buf_size: int # size in bytes of underlying shared memory buffer
def as_msg(self):
return to_builtins(self)
@ -81,10 +89,6 @@ class RBToken(Struct, frozen=True):
@property
def fds(self) -> tuple[int, int, int]:
'''
Useful for `pass_fds` params
'''
return (
self.write_eventfd,
self.wrap_eventfd,
@ -92,38 +96,137 @@ class RBToken(Struct, frozen=True):
)
@cm
def open_ringbuf(
def alloc_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
) -> ContextManager[RBToken]:
) -> tuple[SharedMemory, RBToken]:
'''
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
Allocate OS resources for a ringbuf.
'''
shm = SharedMemory(
name=shm_name,
size=buf_size,
create=True
)
try:
token = RBToken(
shm_name=shm_name,
write_eventfd=open_eventfd(),
wrap_eventfd=open_eventfd(),
eof_eventfd=open_eventfd(),
buf_size=buf_size
)
yield token
finally:
shm.unlink()
token = RBToken(
owner=current_actor().name,
shm_name=shm_name,
write_eventfd=open_eventfd(),
wrap_eventfd=open_eventfd(),
eof_eventfd=open_eventfd(),
buf_size=buf_size
)
# register fds for sharing
share_fds(
shm_name,
token.fds,
)
return shm, token
@cm
def open_ringbuf_pair(
name: str,
def open_ringbuf_sync(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
) -> ContextManager[RBToken]:
'''
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`,
post yield maybe unshare fds and unlink shared memory
'''
shm: SharedMemory | None = None
token: RBToken | None = None
try:
shm, token = alloc_ringbuf(shm_name, buf_size=buf_size)
yield token
finally:
if token:
unshare_fds(shm_name)
if shm:
shm.unlink()
@acm
async def open_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
) -> AsyncContextManager[RBToken]:
'''
Helper to use `open_ringbuf_sync` inside an async with block.
'''
with open_ringbuf_sync(
shm_name,
buf_size=buf_size
) as token:
yield token
@cm
def open_ringbufs_sync(
shm_names: list[str],
buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
) -> ContextManager[tuple[RBToken]]:
'''
Handle resources for multiple ringbufs at once.
'''
# maybe convert single int into list
if isinstance(buf_sizes, int):
buf_size = [buf_sizes] * len(shm_names)
# ensure len(shm_names) == len(buf_sizes)
if (
isinstance(buf_sizes, list)
and
len(buf_sizes) != len(shm_names)
):
raise ValueError(
'Expected buf_size list to be same length as shm_names'
)
# allocate resources
rings: list[tuple[SharedMemory, RBToken]] = [
alloc_ringbuf(shm_name, buf_size=buf_size)
for shm_name, buf_size in zip(shm_names, buf_size)
]
try:
yield tuple([token for _, token in rings])
finally:
# attempt fd unshare and shm unlink for each
for shm, token in rings:
try:
unshare_fds(token.shm_name)
except RuntimeError:
log.exception(f'while unsharing fds of {token}')
shm.unlink()
@acm
async def open_ringbufs(
shm_names: list[str],
buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
) -> AsyncContextManager[tuple[RBToken]]:
'''
Helper to use `open_ringbufs_sync` inside an async with block.
'''
with open_ringbufs_sync(
shm_names,
buf_sizes=buf_sizes
) as tokens:
yield tokens
@cm
def open_ringbuf_pair_sync(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE
) -> ContextManager[tuple(RBToken, RBToken)]:
'''
@ -131,18 +234,30 @@ def open_ringbuf_pair(
bidirectional messaging.
'''
with (
open_ringbuf(
name + '.send',
buf_size=buf_size
) as send_token,
with open_ringbufs_sync(
[
f'{shm_name}.send',
f'{shm_name}.recv'
],
buf_sizes=buf_size
) as tokens:
yield tokens
open_ringbuf(
name + '.recv',
buf_size=buf_size
) as recv_token
):
yield send_token, recv_token
@acm
async def open_ringbuf_pair(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE
) -> AsyncContextManager[tuple[RBToken, RBToken]]:
'''
Helper to use `open_ringbuf_pair_sync` inside an async with block.
'''
with open_ringbuf_pair_sync(
shm_name,
buf_size=buf_size
) as tokens:
yield tokens
Buffer = bytes | bytearray | memoryview
@ -640,6 +755,29 @@ class RingBufferReceiveChannel(trio.abc.ReceiveChannel[bytes]):
return self
async def _maybe_obtain_shared_resources(token: RBToken):
token = RBToken.from_msg(token)
# maybe token wasn't allocated by current actor
if token.owner != current_actor().name:
# use fdshare module to retrieve a copy of the FDs
fds = await request_fds_from(
token.owner,
token.shm_name
)
write, wrap, eof = fds
# rebuild token using FDs copies
token = RBToken(
owner=token.owner,
shm_name=token.shm_name,
write_eventfd=write,
wrap_eventfd=wrap,
eof_eventfd=eof,
buf_size=token.buf_size
)
return token
@acm
async def attach_to_ringbuf_receiver(
@ -651,8 +789,13 @@ async def attach_to_ringbuf_receiver(
Attach a RingBufferReceiveChannel from a previously opened
RBToken.
Requires tractor runtime to be up in order to support opening a ringbuf
originally allocated by a different actor.
Launches `receiver._eof_monitor_task` in a `trio.Nursery`.
'''
token = await _maybe_obtain_shared_resources(token)
async with (
trio.open_nursery(strict_exception_groups=False) as n,
RingBufferReceiveChannel(
@ -676,7 +819,12 @@ async def attach_to_ringbuf_sender(
Attach a RingBufferSendChannel from a previously opened
RBToken.
Requires tractor runtime to be up in order to support opening a ringbuf
originally allocated by a different actor.
'''
token = await _maybe_obtain_shared_resources(token)
async with RingBufferSendChannel(
token,
batch_size=batch_size,

View File

@ -13,7 +13,3 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from ._fdshare import (
send_fds as send_fds,
recv_fds as recv_fds
)

View File

@ -21,13 +21,19 @@ https://github.com/python/cpython/blob/275056a7fdcbe36aaac494b4183ae59943a338eb/
'''
import os
import array
import tempfile
from pathlib import Path
from typing import AsyncContextManager
from contextlib import asynccontextmanager as acm
import trio
import tractor
from trio import socket
log = tractor.log.get_logger(__name__)
class FDSharingError(Exception):
...
@ -157,3 +163,126 @@ async def recv_fds(sock_path: str, amount: int) -> tuple:
)
return tuple(a)
'''
Share FD actor module
Add "tractor.linux._fdshare" to enabled modules on actors to allow sharing of
FDs with other actors.
Use `share_fds` function to register a set of fds with a name, then other
actors can use `request_fds_from` function to retrieve the fds.
Use `unshare_fds` to disable sharing of a set of FDs.
'''
FDType = tuple[int]
_fds: dict[str, FDType] = {}
def maybe_get_fds(name: str) -> FDType | None:
'''
Get registered FDs with a given name or return None
'''
return _fds.get(name, None)
def get_fds(name: str) -> FDType:
'''
Get registered FDs with a given name or raise
'''
fds = maybe_get_fds(name)
if not fds:
raise RuntimeError(f'No FDs with name {name} found!')
return fds
def share_fds(
name: str,
fds: tuple[int],
) -> None:
'''
Register a set of fds to be shared under a given name.
'''
maybe_fds = maybe_get_fds(name)
if maybe_fds:
raise RuntimeError(f'share FDs: {maybe_fds} already tied to name {name}')
_fds[name] = fds
def unshare_fds(name: str) -> None:
'''
Unregister a set of fds to disable sharing them.
'''
get_fds(name) # raise if not exists
del _fds[name]
@tractor.context
async def _pass_fds(
ctx: tractor.Context,
name: str,
sock_path: str
) -> None:
'''
Endpoint to request a set of FDs from current actor, will use `ctx.started`
to send original FDs, then `send_fds` will block until remote side finishes
the `recv_fds` call.
'''
# get fds or raise error
fds = get_fds(name)
# start fd passing context using socket on `sock_path`
async with send_fds(fds, sock_path):
# send original fds through ctx.started
await ctx.started(fds)
async def request_fds_from(
actor_name: str,
fds_name: str
) -> FDType:
'''
Use this function to retreive shared FDs from `actor_name`.
'''
this_actor = tractor.current_actor()
# create a temporary path for the UDS sock
sock_path = str(
Path(tempfile.gettempdir())
/
f'{fds_name}-from-{actor_name}-to-{this_actor.name}.sock'
)
async with (
tractor.find_actor(actor_name) as portal,
portal.open_context(
_pass_fds,
name=fds_name,
sock_path=sock_path
) as (ctx, fds_info),
):
# get original FDs
og_fds = fds_info
# retrieve copies of FDs
fds = await recv_fds(sock_path, len(og_fds))
log.info(
f'{this_actor.name} received fds: {og_fds} -> {fds}'
)
return fds