diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py
index c061a893..2bae7be1 100644
--- a/tests/test_ringbuf.py
+++ b/tests/test_ringbuf.py
@@ -4,8 +4,9 @@ import hashlib
import trio
import pytest
import tractor
-from tractor.ipc import (
+from tractor.ipc._ringbuf import (
open_ringbuf,
+ open_ringbuf_pair,
attach_to_ringbuf_receiver,
attach_to_ringbuf_sender,
attach_to_ringbuf_channel,
@@ -64,7 +65,7 @@ async def child_write_shm(
msg_amount: int,
rand_min: int,
rand_max: int,
- token: RBToken,
+ buf_size: int
) -> None:
'''
Sub-actor used in `test_ringbuf`
@@ -81,9 +82,12 @@ async def child_write_shm(
rand_min=rand_min,
rand_max=rand_max,
)
- await ctx.started()
- print('writer started')
- async with attach_to_ringbuf_sender(token, cleanup=False) as sender:
+ async with (
+ open_ringbuf('test_ringbuf', buf_size=buf_size) as token,
+ attach_to_ringbuf_sender(token) as sender
+ ):
+ await ctx.started(token)
+ print('writer started')
for msg in rng:
await sender.send(msg)
@@ -129,55 +133,53 @@ def test_ringbuf(
'''
async def main():
- with open_ringbuf(
- 'test_ringbuf',
- buf_size=buf_size
- ) as token:
- proc_kwargs = {'pass_fds': token.fds}
+ async with tractor.open_nursery() as an:
+ send_p = await an.start_actor(
+ 'ring_sender',
+ enable_modules=[
+ __name__,
+ 'tractor.linux._fdshare'
+ ],
+ )
+ recv_p = await an.start_actor(
+ 'ring_receiver',
+ enable_modules=[
+ __name__,
+ 'tractor.linux._fdshare'
+ ],
+ )
+ async with (
+ send_p.open_context(
+ child_write_shm,
+ msg_amount=msg_amount,
+ rand_min=rand_min,
+ rand_max=rand_max,
+ buf_size=buf_size
+ ) as (sctx, token),
- async with tractor.open_nursery() as an:
- send_p = await an.start_actor(
- 'ring_sender',
- enable_modules=[__name__],
- proc_kwargs=proc_kwargs
- )
- recv_p = await an.start_actor(
- 'ring_receiver',
- enable_modules=[__name__],
- proc_kwargs=proc_kwargs
- )
- async with (
- send_p.open_context(
- child_write_shm,
- token=token,
- msg_amount=msg_amount,
- rand_min=rand_min,
- rand_max=rand_max,
- ) as (sctx, _),
+ recv_p.open_context(
+ child_read_shm,
+ token=token,
+ ) as (rctx, _),
+ ):
+ sent_hash = await sctx.result()
+ recvd_hash = await rctx.result()
- recv_p.open_context(
- child_read_shm,
- token=token,
- ) as (rctx, _),
- ):
- sent_hash = await sctx.result()
- recvd_hash = await rctx.result()
+ assert sent_hash == recvd_hash
- assert sent_hash == recvd_hash
-
- await send_p.cancel_actor()
- await recv_p.cancel_actor()
+ await an.cancel()
trio.run(main)
@tractor.context
-async def child_blocked_receiver(
- ctx: tractor.Context,
- token: RBToken
-):
- async with attach_to_ringbuf_receiver(token) as receiver:
- await ctx.started()
+async def child_blocked_receiver(ctx: tractor.Context):
+ async with (
+ open_ringbuf('test_ring_cancel_reader') as token,
+
+ attach_to_ringbuf_receiver(token) as receiver
+ ):
+ await ctx.started(token)
await receiver.receive_some()
@@ -188,26 +190,23 @@ def test_reader_cancel():
'''
async def main():
- with open_ringbuf('test_ring_cancel_reader') as token:
+ async with tractor.open_nursery() as an:
+ recv_p = await an.start_actor(
+ 'ring_blocked_receiver',
+ enable_modules=[
+ __name__,
+ 'tractor.linux._fdshare'
+ ],
+ )
async with (
- tractor.open_nursery() as an,
- attach_to_ringbuf_sender(token) as _sender,
+ recv_p.open_context(
+ child_blocked_receiver,
+ ) as (sctx, token),
+
+ attach_to_ringbuf_sender(token),
):
- recv_p = await an.start_actor(
- 'ring_blocked_receiver',
- enable_modules=[__name__],
- proc_kwargs={
- 'pass_fds': token.fds
- }
- )
- async with (
- recv_p.open_context(
- child_blocked_receiver,
- token=token
- ) as (sctx, _sent),
- ):
- await trio.sleep(1)
- await an.cancel()
+ await trio.sleep(.1)
+ await an.cancel()
with pytest.raises(tractor._exceptions.ContextCancelled):
@@ -215,12 +214,16 @@ def test_reader_cancel():
@tractor.context
-async def child_blocked_sender(
- ctx: tractor.Context,
- token: RBToken
-):
- async with attach_to_ringbuf_sender(token) as sender:
- await ctx.started()
+async def child_blocked_sender(ctx: tractor.Context):
+ async with (
+ open_ringbuf(
+ 'test_ring_cancel_sender',
+ buf_size=1
+ ) as token,
+
+ attach_to_ringbuf_sender(token) as sender
+ ):
+ await ctx.started(token)
await sender.send_all(b'this will wrap')
@@ -231,26 +234,23 @@ def test_sender_cancel():
'''
async def main():
- with open_ringbuf(
- 'test_ring_cancel_sender',
- buf_size=1
- ) as token:
- async with tractor.open_nursery() as an:
- recv_p = await an.start_actor(
- 'ring_blocked_sender',
- enable_modules=[__name__],
- proc_kwargs={
- 'pass_fds': token.fds
- }
- )
- async with (
- recv_p.open_context(
- child_blocked_sender,
- token=token
- ) as (sctx, _sent),
- ):
- await trio.sleep(1)
- await an.cancel()
+ async with tractor.open_nursery() as an:
+ recv_p = await an.start_actor(
+ 'ring_blocked_sender',
+ enable_modules=[
+ __name__,
+ 'tractor.linux._fdshare'
+ ],
+ )
+ async with (
+ recv_p.open_context(
+ child_blocked_sender,
+ ) as (sctx, token),
+
+ attach_to_ringbuf_receiver(token)
+ ):
+ await trio.sleep(.1)
+ await an.cancel()
with pytest.raises(tractor._exceptions.ContextCancelled):
@@ -270,24 +270,28 @@ def test_receiver_max_bytes():
msgs = []
async def main():
- with open_ringbuf(
- 'test_ringbuf_max_bytes',
- buf_size=10
- ) as token:
- async with (
- trio.open_nursery() as n,
- attach_to_ringbuf_sender(token, cleanup=False) as sender,
- attach_to_ringbuf_receiver(token, cleanup=False) as receiver
- ):
- async def _send_and_close():
- await sender.send_all(msg)
- await sender.aclose()
+ async with (
+ tractor.open_nursery(),
+ open_ringbuf(
+ 'test_ringbuf_max_bytes',
+ buf_size=10
+ ) as token,
- n.start_soon(_send_and_close)
- while len(msgs) < len(msg):
- msg_part = await receiver.receive_some(max_bytes=1)
- assert len(msg_part) == 1
- msgs.append(msg_part)
+ trio.open_nursery() as n,
+
+ attach_to_ringbuf_sender(token, cleanup=False) as sender,
+
+ attach_to_ringbuf_receiver(token, cleanup=False) as receiver
+ ):
+ async def _send_and_close():
+ await sender.send_all(msg)
+ await sender.aclose()
+
+ n.start_soon(_send_and_close)
+ while len(msgs) < len(msg):
+ msg_part = await receiver.receive_some(max_bytes=1)
+ assert len(msg_part) == 1
+ msgs.append(msg_part)
trio.run(main)
assert msg == b''.join(msgs)
@@ -325,42 +329,46 @@ def test_channel():
msg_amount_min = 100
msg_amount_max = 1000
+ mods = [
+ __name__,
+ 'tractor.linux._fdshare'
+ ]
+
async def main():
- with tractor.ipc.open_ringbuf_pair(
- 'test_ringbuf_transport'
- ) as (send_token, recv_token):
+ async with (
+ tractor.open_nursery(enable_modules=mods) as an,
+
+ open_ringbuf_pair(
+ 'test_ringbuf_transport'
+ ) as (send_token, recv_token),
+
+ attach_to_ringbuf_channel(send_token, recv_token) as chan,
+ ):
+ sender = await an.start_actor(
+ 'test_ringbuf_transport_sender',
+ enable_modules=mods,
+ )
async with (
- attach_to_ringbuf_channel(send_token, recv_token) as chan,
- tractor.open_nursery() as an
+ sender.open_context(
+ child_channel_sender,
+ msg_amount_min=msg_amount_min,
+ msg_amount_max=msg_amount_max,
+ token_in=recv_token,
+ token_out=send_token
+ ) as (ctx, _),
):
- sender = await an.start_actor(
- 'test_ringbuf_transport_sender',
- enable_modules=[__name__],
- proc_kwargs={
- 'pass_fds': send_token.fds + recv_token.fds
- }
- )
- async with (
- sender.open_context(
- child_channel_sender,
- msg_amount_min=msg_amount_min,
- msg_amount_max=msg_amount_max,
- token_in=recv_token,
- token_out=send_token
- ) as (ctx, _),
- ):
- recvd_hash = hashlib.sha256()
- async for msg in chan:
- if msg == b'bye':
- await chan.send(b'bye')
- break
+ recvd_hash = hashlib.sha256()
+ async for msg in chan:
+ if msg == b'bye':
+ await chan.send(b'bye')
+ break
- recvd_hash.update(msg)
+ recvd_hash.update(msg)
- sent_hash = await ctx.result()
+ sent_hash = await ctx.result()
- assert recvd_hash.hexdigest() == sent_hash
+ assert recvd_hash.hexdigest() == sent_hash
- await an.cancel()
+ await an.cancel()
trio.run(main)
diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py
index f2e75563..54ea81f3 100644
--- a/tractor/ipc/__init__.py
+++ b/tractor/ipc/__init__.py
@@ -13,8 +13,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-import platform
-
from ._transport import MsgTransport as MsgTransport
from ._tcp import (
@@ -27,20 +25,3 @@ from ._chan import (
get_msg_transport as get_msg_transport,
Channel as Channel
)
-
-if platform.system() == 'Linux':
- from ._ringbuf import (
- RBToken as RBToken,
-
- open_ringbuf as open_ringbuf,
- open_ringbuf_pair as open_ringbuf_pair,
-
- RingBufferSendChannel as RingBufferSendChannel,
- attach_to_ringbuf_sender as attach_to_ringbuf_sender,
-
- RingBufferReceiveChannel as RingBufferReceiveChannel,
- attach_to_ringbuf_receiver as attach_to_ringbuf_receiver,
-
- RingBufferChannel as RingBufferChannel,
- attach_to_ringbuf_channel as attach_to_ringbuf_channel,
- )
diff --git a/tractor/ipc/_ringbuf/__init__.py b/tractor/ipc/_ringbuf/__init__.py
index 2c74b88e..0d96d1a3 100644
--- a/tractor/ipc/_ringbuf/__init__.py
+++ b/tractor/ipc/_ringbuf/__init__.py
@@ -35,16 +35,22 @@ from msgspec import (
to_builtins
)
-from ...log import get_logger
-from ..._exceptions import (
+from tractor.log import get_logger
+from tractor._exceptions import (
InternalError
)
-from .._mp_bs import disable_mantracker
-from ...linux.eventfd import (
+from tractor.ipc._mp_bs import disable_mantracker
+from tractor.linux._fdshare import (
+ share_fds,
+ unshare_fds,
+ request_fds_from
+)
+from tractor.linux.eventfd import (
open_eventfd,
EFDReadCancelled,
EventFD
)
+from tractor._state import current_actor
log = get_logger(__name__)
@@ -57,17 +63,19 @@ _DEFAULT_RB_SIZE = 10 * 1024
class RBToken(Struct, frozen=True):
'''
- RingBuffer token contains necesary info to open the three
- eventfds and the shared memory
+ RingBuffer token contains necesary info to open resources of a ringbuf,
+ even in the case that ringbuf was not allocated by current actor.
'''
+ owner: str # if owner != `current_actor().name` we must use fdshare
+
shm_name: str
write_eventfd: int # used to signal writer ptr advance
wrap_eventfd: int # used to signal reader ready after wrap around
eof_eventfd: int # used to signal writer closed
- buf_size: int
+ buf_size: int # size in bytes of underlying shared memory buffer
def as_msg(self):
return to_builtins(self)
@@ -81,10 +89,6 @@ class RBToken(Struct, frozen=True):
@property
def fds(self) -> tuple[int, int, int]:
- '''
- Useful for `pass_fds` params
-
- '''
return (
self.write_eventfd,
self.wrap_eventfd,
@@ -92,38 +96,137 @@ class RBToken(Struct, frozen=True):
)
-@cm
-def open_ringbuf(
+def alloc_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
-) -> ContextManager[RBToken]:
+) -> tuple[SharedMemory, RBToken]:
'''
- Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
- be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
-
+ Allocate OS resources for a ringbuf.
'''
shm = SharedMemory(
name=shm_name,
size=buf_size,
create=True
)
- try:
- token = RBToken(
- shm_name=shm_name,
- write_eventfd=open_eventfd(),
- wrap_eventfd=open_eventfd(),
- eof_eventfd=open_eventfd(),
- buf_size=buf_size
- )
- yield token
-
- finally:
- shm.unlink()
+ token = RBToken(
+ owner=current_actor().name,
+ shm_name=shm_name,
+ write_eventfd=open_eventfd(),
+ wrap_eventfd=open_eventfd(),
+ eof_eventfd=open_eventfd(),
+ buf_size=buf_size
+ )
+ # register fds for sharing
+ share_fds(
+ shm_name,
+ token.fds,
+ )
+ return shm, token
@cm
-def open_ringbuf_pair(
- name: str,
+def open_ringbuf_sync(
+ shm_name: str,
+ buf_size: int = _DEFAULT_RB_SIZE,
+) -> ContextManager[RBToken]:
+ '''
+ Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
+ be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`,
+ post yield maybe unshare fds and unlink shared memory
+
+ '''
+ shm: SharedMemory | None = None
+ token: RBToken | None = None
+ try:
+ shm, token = alloc_ringbuf(shm_name, buf_size=buf_size)
+ yield token
+
+ finally:
+ if token:
+ unshare_fds(shm_name)
+
+ if shm:
+ shm.unlink()
+
+@acm
+async def open_ringbuf(
+ shm_name: str,
+ buf_size: int = _DEFAULT_RB_SIZE,
+) -> AsyncContextManager[RBToken]:
+ '''
+ Helper to use `open_ringbuf_sync` inside an async with block.
+
+ '''
+ with open_ringbuf_sync(
+ shm_name,
+ buf_size=buf_size
+ ) as token:
+ yield token
+
+
+@cm
+def open_ringbufs_sync(
+ shm_names: list[str],
+ buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
+) -> ContextManager[tuple[RBToken]]:
+ '''
+ Handle resources for multiple ringbufs at once.
+
+ '''
+ # maybe convert single int into list
+ if isinstance(buf_sizes, int):
+ buf_size = [buf_sizes] * len(shm_names)
+
+ # ensure len(shm_names) == len(buf_sizes)
+ if (
+ isinstance(buf_sizes, list)
+ and
+ len(buf_sizes) != len(shm_names)
+ ):
+ raise ValueError(
+ 'Expected buf_size list to be same length as shm_names'
+ )
+
+ # allocate resources
+ rings: list[tuple[SharedMemory, RBToken]] = [
+ alloc_ringbuf(shm_name, buf_size=buf_size)
+ for shm_name, buf_size in zip(shm_names, buf_size)
+ ]
+
+ try:
+ yield tuple([token for _, token in rings])
+
+ finally:
+ # attempt fd unshare and shm unlink for each
+ for shm, token in rings:
+ try:
+ unshare_fds(token.shm_name)
+
+ except RuntimeError:
+ log.exception(f'while unsharing fds of {token}')
+
+ shm.unlink()
+
+
+@acm
+async def open_ringbufs(
+ shm_names: list[str],
+ buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
+) -> AsyncContextManager[tuple[RBToken]]:
+ '''
+ Helper to use `open_ringbufs_sync` inside an async with block.
+
+ '''
+ with open_ringbufs_sync(
+ shm_names,
+ buf_sizes=buf_sizes
+ ) as tokens:
+ yield tokens
+
+
+@cm
+def open_ringbuf_pair_sync(
+ shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE
) -> ContextManager[tuple(RBToken, RBToken)]:
'''
@@ -131,18 +234,30 @@ def open_ringbuf_pair(
bidirectional messaging.
'''
- with (
- open_ringbuf(
- name + '.send',
- buf_size=buf_size
- ) as send_token,
+ with open_ringbufs_sync(
+ [
+ f'{shm_name}.send',
+ f'{shm_name}.recv'
+ ],
+ buf_sizes=buf_size
+ ) as tokens:
+ yield tokens
- open_ringbuf(
- name + '.recv',
- buf_size=buf_size
- ) as recv_token
- ):
- yield send_token, recv_token
+
+@acm
+async def open_ringbuf_pair(
+ shm_name: str,
+ buf_size: int = _DEFAULT_RB_SIZE
+) -> AsyncContextManager[tuple[RBToken, RBToken]]:
+ '''
+ Helper to use `open_ringbuf_pair_sync` inside an async with block.
+
+ '''
+ with open_ringbuf_pair_sync(
+ shm_name,
+ buf_size=buf_size
+ ) as tokens:
+ yield tokens
Buffer = bytes | bytearray | memoryview
@@ -640,6 +755,29 @@ class RingBufferReceiveChannel(trio.abc.ReceiveChannel[bytes]):
return self
+async def _maybe_obtain_shared_resources(token: RBToken):
+ token = RBToken.from_msg(token)
+
+ # maybe token wasn't allocated by current actor
+ if token.owner != current_actor().name:
+ # use fdshare module to retrieve a copy of the FDs
+ fds = await request_fds_from(
+ token.owner,
+ token.shm_name
+ )
+ write, wrap, eof = fds
+ # rebuild token using FDs copies
+ token = RBToken(
+ owner=token.owner,
+ shm_name=token.shm_name,
+ write_eventfd=write,
+ wrap_eventfd=wrap,
+ eof_eventfd=eof,
+ buf_size=token.buf_size
+ )
+
+ return token
+
@acm
async def attach_to_ringbuf_receiver(
@@ -651,8 +789,13 @@ async def attach_to_ringbuf_receiver(
Attach a RingBufferReceiveChannel from a previously opened
RBToken.
+ Requires tractor runtime to be up in order to support opening a ringbuf
+ originally allocated by a different actor.
+
Launches `receiver._eof_monitor_task` in a `trio.Nursery`.
'''
+ token = await _maybe_obtain_shared_resources(token)
+
async with (
trio.open_nursery(strict_exception_groups=False) as n,
RingBufferReceiveChannel(
@@ -676,7 +819,12 @@ async def attach_to_ringbuf_sender(
Attach a RingBufferSendChannel from a previously opened
RBToken.
+ Requires tractor runtime to be up in order to support opening a ringbuf
+ originally allocated by a different actor.
+
'''
+ token = await _maybe_obtain_shared_resources(token)
+
async with RingBufferSendChannel(
token,
batch_size=batch_size,
diff --git a/tractor/linux/__init__.py b/tractor/linux/__init__.py
index 211a0040..33526d14 100644
--- a/tractor/linux/__init__.py
+++ b/tractor/linux/__init__.py
@@ -13,7 +13,3 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from ._fdshare import (
- send_fds as send_fds,
- recv_fds as recv_fds
-)
diff --git a/tractor/linux/_fdshare.py b/tractor/linux/_fdshare.py
index 16a748b8..bc2385ef 100644
--- a/tractor/linux/_fdshare.py
+++ b/tractor/linux/_fdshare.py
@@ -21,13 +21,19 @@ https://github.com/python/cpython/blob/275056a7fdcbe36aaac494b4183ae59943a338eb/
'''
import os
import array
+import tempfile
+from pathlib import Path
from typing import AsyncContextManager
from contextlib import asynccontextmanager as acm
import trio
+import tractor
from trio import socket
+log = tractor.log.get_logger(__name__)
+
+
class FDSharingError(Exception):
...
@@ -157,3 +163,126 @@ async def recv_fds(sock_path: str, amount: int) -> tuple:
)
return tuple(a)
+
+
+'''
+Share FD actor module
+
+Add "tractor.linux._fdshare" to enabled modules on actors to allow sharing of
+FDs with other actors.
+
+Use `share_fds` function to register a set of fds with a name, then other
+actors can use `request_fds_from` function to retrieve the fds.
+
+Use `unshare_fds` to disable sharing of a set of FDs.
+
+'''
+
+FDType = tuple[int]
+
+_fds: dict[str, FDType] = {}
+
+
+def maybe_get_fds(name: str) -> FDType | None:
+ '''
+ Get registered FDs with a given name or return None
+
+ '''
+ return _fds.get(name, None)
+
+
+def get_fds(name: str) -> FDType:
+ '''
+ Get registered FDs with a given name or raise
+ '''
+ fds = maybe_get_fds(name)
+
+ if not fds:
+ raise RuntimeError(f'No FDs with name {name} found!')
+
+ return fds
+
+
+def share_fds(
+ name: str,
+ fds: tuple[int],
+) -> None:
+ '''
+ Register a set of fds to be shared under a given name.
+
+ '''
+ maybe_fds = maybe_get_fds(name)
+ if maybe_fds:
+ raise RuntimeError(f'share FDs: {maybe_fds} already tied to name {name}')
+
+ _fds[name] = fds
+
+
+def unshare_fds(name: str) -> None:
+ '''
+ Unregister a set of fds to disable sharing them.
+
+ '''
+ get_fds(name) # raise if not exists
+
+ del _fds[name]
+
+
+@tractor.context
+async def _pass_fds(
+ ctx: tractor.Context,
+ name: str,
+ sock_path: str
+) -> None:
+ '''
+ Endpoint to request a set of FDs from current actor, will use `ctx.started`
+ to send original FDs, then `send_fds` will block until remote side finishes
+ the `recv_fds` call.
+
+ '''
+ # get fds or raise error
+ fds = get_fds(name)
+
+ # start fd passing context using socket on `sock_path`
+ async with send_fds(fds, sock_path):
+ # send original fds through ctx.started
+ await ctx.started(fds)
+
+
+async def request_fds_from(
+ actor_name: str,
+ fds_name: str
+) -> FDType:
+ '''
+ Use this function to retreive shared FDs from `actor_name`.
+
+ '''
+ this_actor = tractor.current_actor()
+
+ # create a temporary path for the UDS sock
+ sock_path = str(
+ Path(tempfile.gettempdir())
+ /
+ f'{fds_name}-from-{actor_name}-to-{this_actor.name}.sock'
+ )
+
+ async with (
+ tractor.find_actor(actor_name) as portal,
+
+ portal.open_context(
+ _pass_fds,
+ name=fds_name,
+ sock_path=sock_path
+ ) as (ctx, fds_info),
+ ):
+ # get original FDs
+ og_fds = fds_info
+
+ # retrieve copies of FDs
+ fds = await recv_fds(sock_path, len(og_fds))
+
+ log.info(
+ f'{this_actor.name} received fds: {og_fds} -> {fds}'
+ )
+
+ return fds