diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index 2befeafc..3e6a5734 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -7,6 +7,7 @@ import pytest import tractor from tractor.ipc._ringbuf import ( open_ringbuf, + open_ringbuf_pair, attach_to_ringbuf_receiver, attach_to_ringbuf_sender, attach_to_ringbuf_channel, @@ -68,7 +69,7 @@ async def child_write_shm( msg_amount: int, rand_min: int, rand_max: int, - token: RBToken, + buf_size: int ) -> None: ''' Sub-actor used in `test_ringbuf` @@ -85,9 +86,12 @@ async def child_write_shm( rand_min=rand_min, rand_max=rand_max, ) - await ctx.started() - print('writer started') - async with attach_to_ringbuf_sender(token, cleanup=False) as sender: + async with ( + open_ringbuf('test_ringbuf', buf_size=buf_size) as token, + attach_to_ringbuf_sender(token) as sender + ): + await ctx.started(token) + print('writer started') for msg in rng: await sender.send(msg) @@ -133,55 +137,53 @@ def test_ringbuf( ''' async def main(): - with open_ringbuf( - 'test_ringbuf', - buf_size=buf_size - ) as token: - proc_kwargs = {'pass_fds': token.fds} + async with tractor.open_nursery() as an: + send_p = await an.start_actor( + 'ring_sender', + enable_modules=[ + __name__, + 'tractor.linux._fdshare' + ], + ) + recv_p = await an.start_actor( + 'ring_receiver', + enable_modules=[ + __name__, + 'tractor.linux._fdshare' + ], + ) + async with ( + send_p.open_context( + child_write_shm, + msg_amount=msg_amount, + rand_min=rand_min, + rand_max=rand_max, + buf_size=buf_size + ) as (sctx, token), - async with tractor.open_nursery() as an: - send_p = await an.start_actor( - 'ring_sender', - enable_modules=[__name__], - proc_kwargs=proc_kwargs - ) - recv_p = await an.start_actor( - 'ring_receiver', - enable_modules=[__name__], - proc_kwargs=proc_kwargs - ) - async with ( - send_p.open_context( - child_write_shm, - token=token, - msg_amount=msg_amount, - rand_min=rand_min, - rand_max=rand_max, - ) as (sctx, _), + recv_p.open_context( + child_read_shm, + token=token, + ) as (rctx, _), + ): + sent_hash = await sctx.result() + recvd_hash = await rctx.result() - recv_p.open_context( - child_read_shm, - token=token, - ) as (rctx, _), - ): - sent_hash = await sctx.result() - recvd_hash = await rctx.result() + assert sent_hash == recvd_hash - assert sent_hash == recvd_hash - - await send_p.cancel_actor() - await recv_p.cancel_actor() + await an.cancel() trio.run(main) @tractor.context -async def child_blocked_receiver( - ctx: tractor.Context, - token: RBToken -): - async with attach_to_ringbuf_receiver(token) as receiver: - await ctx.started() +async def child_blocked_receiver(ctx: tractor.Context): + async with ( + open_ringbuf('test_ring_cancel_reader') as token, + + attach_to_ringbuf_receiver(token) as receiver + ): + await ctx.started(token) await receiver.receive_some() @@ -192,26 +194,23 @@ def test_reader_cancel(): ''' async def main(): - with open_ringbuf('test_ring_cancel_reader') as token: + async with tractor.open_nursery() as an: + recv_p = await an.start_actor( + 'ring_blocked_receiver', + enable_modules=[ + __name__, + 'tractor.linux._fdshare' + ], + ) async with ( - tractor.open_nursery() as an, - attach_to_ringbuf_sender(token) as _sender, + recv_p.open_context( + child_blocked_receiver, + ) as (sctx, token), + + attach_to_ringbuf_sender(token), ): - recv_p = await an.start_actor( - 'ring_blocked_receiver', - enable_modules=[__name__], - proc_kwargs={ - 'pass_fds': token.fds - } - ) - async with ( - recv_p.open_context( - child_blocked_receiver, - token=token - ) as (sctx, _sent), - ): - await trio.sleep(1) - await an.cancel() + await trio.sleep(.1) + await an.cancel() with pytest.raises(tractor._exceptions.ContextCancelled): @@ -219,12 +218,16 @@ def test_reader_cancel(): @tractor.context -async def child_blocked_sender( - ctx: tractor.Context, - token: RBToken -): - async with attach_to_ringbuf_sender(token) as sender: - await ctx.started() +async def child_blocked_sender(ctx: tractor.Context): + async with ( + open_ringbuf( + 'test_ring_cancel_sender', + buf_size=1 + ) as token, + + attach_to_ringbuf_sender(token) as sender + ): + await ctx.started(token) await sender.send_all(b'this will wrap') @@ -235,26 +238,23 @@ def test_sender_cancel(): ''' async def main(): - with open_ringbuf( - 'test_ring_cancel_sender', - buf_size=1 - ) as token: - async with tractor.open_nursery() as an: - recv_p = await an.start_actor( - 'ring_blocked_sender', - enable_modules=[__name__], - proc_kwargs={ - 'pass_fds': token.fds - } - ) - async with ( - recv_p.open_context( - child_blocked_sender, - token=token - ) as (sctx, _sent), - ): - await trio.sleep(1) - await an.cancel() + async with tractor.open_nursery() as an: + recv_p = await an.start_actor( + 'ring_blocked_sender', + enable_modules=[ + __name__, + 'tractor.linux._fdshare' + ], + ) + async with ( + recv_p.open_context( + child_blocked_sender, + ) as (sctx, token), + + attach_to_ringbuf_receiver(token) + ): + await trio.sleep(.1) + await an.cancel() with pytest.raises(tractor._exceptions.ContextCancelled): @@ -274,24 +274,28 @@ def test_receiver_max_bytes(): msgs = [] async def main(): - with open_ringbuf( - 'test_ringbuf_max_bytes', - buf_size=10 - ) as token: - async with ( - trio.open_nursery() as n, - attach_to_ringbuf_sender(token, cleanup=False) as sender, - attach_to_ringbuf_receiver(token, cleanup=False) as receiver - ): - async def _send_and_close(): - await sender.send_all(msg) - await sender.aclose() + async with ( + tractor.open_nursery(), + open_ringbuf( + 'test_ringbuf_max_bytes', + buf_size=10 + ) as token, - n.start_soon(_send_and_close) - while len(msgs) < len(msg): - msg_part = await receiver.receive_some(max_bytes=1) - assert len(msg_part) == 1 - msgs.append(msg_part) + trio.open_nursery() as n, + + attach_to_ringbuf_sender(token, cleanup=False) as sender, + + attach_to_ringbuf_receiver(token, cleanup=False) as receiver + ): + async def _send_and_close(): + await sender.send_all(msg) + await sender.aclose() + + n.start_soon(_send_and_close) + while len(msgs) < len(msg): + msg_part = await receiver.receive_some(max_bytes=1) + assert len(msg_part) == 1 + msgs.append(msg_part) trio.run(main) assert msg == b''.join(msgs) @@ -329,42 +333,46 @@ def test_channel(): msg_amount_min = 100 msg_amount_max = 1000 + mods = [ + __name__, + 'tractor.linux._fdshare' + ] + async def main(): - with tractor.ipc.open_ringbuf_pair( - 'test_ringbuf_transport' - ) as (send_token, recv_token): + async with ( + tractor.open_nursery(enable_modules=mods) as an, + + open_ringbuf_pair( + 'test_ringbuf_transport' + ) as (send_token, recv_token), + + attach_to_ringbuf_channel(send_token, recv_token) as chan, + ): + sender = await an.start_actor( + 'test_ringbuf_transport_sender', + enable_modules=mods, + ) async with ( - attach_to_ringbuf_channel(send_token, recv_token) as chan, - tractor.open_nursery() as an + sender.open_context( + child_channel_sender, + msg_amount_min=msg_amount_min, + msg_amount_max=msg_amount_max, + token_in=recv_token, + token_out=send_token + ) as (ctx, _), ): - sender = await an.start_actor( - 'test_ringbuf_transport_sender', - enable_modules=[__name__], - proc_kwargs={ - 'pass_fds': send_token.fds + recv_token.fds - } - ) - async with ( - sender.open_context( - child_channel_sender, - msg_amount_min=msg_amount_min, - msg_amount_max=msg_amount_max, - token_in=recv_token, - token_out=send_token - ) as (ctx, _), - ): - recvd_hash = hashlib.sha256() - async for msg in chan: - if msg == b'bye': - await chan.send(b'bye') - break + recvd_hash = hashlib.sha256() + async for msg in chan: + if msg == b'bye': + await chan.send(b'bye') + break - recvd_hash.update(msg) + recvd_hash.update(msg) - sent_hash = await ctx.result() + sent_hash = await ctx.result() - assert recvd_hash.hexdigest() == sent_hash + assert recvd_hash.hexdigest() == sent_hash - await an.cancel() + await an.cancel() trio.run(main) diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index f2f42839..37c3c8ed 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -17,26 +17,7 @@ A modular IPC layer supporting the power of cross-process SC! ''' -import platform - from ._chan import ( _connect_chan as _connect_chan, Channel as Channel ) - -if platform.system() == 'Linux': - from ._ringbuf import ( - RBToken as RBToken, - - open_ringbuf as open_ringbuf, - open_ringbuf_pair as open_ringbuf_pair, - - RingBufferSendChannel as RingBufferSendChannel, - attach_to_ringbuf_sender as attach_to_ringbuf_sender, - - RingBufferReceiveChannel as RingBufferReceiveChannel, - attach_to_ringbuf_receiver as attach_to_ringbuf_receiver, - - RingBufferChannel as RingBufferChannel, - attach_to_ringbuf_channel as attach_to_ringbuf_channel, - ) diff --git a/tractor/ipc/_ringbuf/__init__.py b/tractor/ipc/_ringbuf/__init__.py index 2c74b88e..0d96d1a3 100644 --- a/tractor/ipc/_ringbuf/__init__.py +++ b/tractor/ipc/_ringbuf/__init__.py @@ -35,16 +35,22 @@ from msgspec import ( to_builtins ) -from ...log import get_logger -from ..._exceptions import ( +from tractor.log import get_logger +from tractor._exceptions import ( InternalError ) -from .._mp_bs import disable_mantracker -from ...linux.eventfd import ( +from tractor.ipc._mp_bs import disable_mantracker +from tractor.linux._fdshare import ( + share_fds, + unshare_fds, + request_fds_from +) +from tractor.linux.eventfd import ( open_eventfd, EFDReadCancelled, EventFD ) +from tractor._state import current_actor log = get_logger(__name__) @@ -57,17 +63,19 @@ _DEFAULT_RB_SIZE = 10 * 1024 class RBToken(Struct, frozen=True): ''' - RingBuffer token contains necesary info to open the three - eventfds and the shared memory + RingBuffer token contains necesary info to open resources of a ringbuf, + even in the case that ringbuf was not allocated by current actor. ''' + owner: str # if owner != `current_actor().name` we must use fdshare + shm_name: str write_eventfd: int # used to signal writer ptr advance wrap_eventfd: int # used to signal reader ready after wrap around eof_eventfd: int # used to signal writer closed - buf_size: int + buf_size: int # size in bytes of underlying shared memory buffer def as_msg(self): return to_builtins(self) @@ -81,10 +89,6 @@ class RBToken(Struct, frozen=True): @property def fds(self) -> tuple[int, int, int]: - ''' - Useful for `pass_fds` params - - ''' return ( self.write_eventfd, self.wrap_eventfd, @@ -92,38 +96,137 @@ class RBToken(Struct, frozen=True): ) -@cm -def open_ringbuf( +def alloc_ringbuf( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, -) -> ContextManager[RBToken]: +) -> tuple[SharedMemory, RBToken]: ''' - Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to - be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver` - + Allocate OS resources for a ringbuf. ''' shm = SharedMemory( name=shm_name, size=buf_size, create=True ) - try: - token = RBToken( - shm_name=shm_name, - write_eventfd=open_eventfd(), - wrap_eventfd=open_eventfd(), - eof_eventfd=open_eventfd(), - buf_size=buf_size - ) - yield token - - finally: - shm.unlink() + token = RBToken( + owner=current_actor().name, + shm_name=shm_name, + write_eventfd=open_eventfd(), + wrap_eventfd=open_eventfd(), + eof_eventfd=open_eventfd(), + buf_size=buf_size + ) + # register fds for sharing + share_fds( + shm_name, + token.fds, + ) + return shm, token @cm -def open_ringbuf_pair( - name: str, +def open_ringbuf_sync( + shm_name: str, + buf_size: int = _DEFAULT_RB_SIZE, +) -> ContextManager[RBToken]: + ''' + Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to + be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`, + post yield maybe unshare fds and unlink shared memory + + ''' + shm: SharedMemory | None = None + token: RBToken | None = None + try: + shm, token = alloc_ringbuf(shm_name, buf_size=buf_size) + yield token + + finally: + if token: + unshare_fds(shm_name) + + if shm: + shm.unlink() + +@acm +async def open_ringbuf( + shm_name: str, + buf_size: int = _DEFAULT_RB_SIZE, +) -> AsyncContextManager[RBToken]: + ''' + Helper to use `open_ringbuf_sync` inside an async with block. + + ''' + with open_ringbuf_sync( + shm_name, + buf_size=buf_size + ) as token: + yield token + + +@cm +def open_ringbufs_sync( + shm_names: list[str], + buf_sizes: int | list[str] = _DEFAULT_RB_SIZE, +) -> ContextManager[tuple[RBToken]]: + ''' + Handle resources for multiple ringbufs at once. + + ''' + # maybe convert single int into list + if isinstance(buf_sizes, int): + buf_size = [buf_sizes] * len(shm_names) + + # ensure len(shm_names) == len(buf_sizes) + if ( + isinstance(buf_sizes, list) + and + len(buf_sizes) != len(shm_names) + ): + raise ValueError( + 'Expected buf_size list to be same length as shm_names' + ) + + # allocate resources + rings: list[tuple[SharedMemory, RBToken]] = [ + alloc_ringbuf(shm_name, buf_size=buf_size) + for shm_name, buf_size in zip(shm_names, buf_size) + ] + + try: + yield tuple([token for _, token in rings]) + + finally: + # attempt fd unshare and shm unlink for each + for shm, token in rings: + try: + unshare_fds(token.shm_name) + + except RuntimeError: + log.exception(f'while unsharing fds of {token}') + + shm.unlink() + + +@acm +async def open_ringbufs( + shm_names: list[str], + buf_sizes: int | list[str] = _DEFAULT_RB_SIZE, +) -> AsyncContextManager[tuple[RBToken]]: + ''' + Helper to use `open_ringbufs_sync` inside an async with block. + + ''' + with open_ringbufs_sync( + shm_names, + buf_sizes=buf_sizes + ) as tokens: + yield tokens + + +@cm +def open_ringbuf_pair_sync( + shm_name: str, buf_size: int = _DEFAULT_RB_SIZE ) -> ContextManager[tuple(RBToken, RBToken)]: ''' @@ -131,18 +234,30 @@ def open_ringbuf_pair( bidirectional messaging. ''' - with ( - open_ringbuf( - name + '.send', - buf_size=buf_size - ) as send_token, + with open_ringbufs_sync( + [ + f'{shm_name}.send', + f'{shm_name}.recv' + ], + buf_sizes=buf_size + ) as tokens: + yield tokens - open_ringbuf( - name + '.recv', - buf_size=buf_size - ) as recv_token - ): - yield send_token, recv_token + +@acm +async def open_ringbuf_pair( + shm_name: str, + buf_size: int = _DEFAULT_RB_SIZE +) -> AsyncContextManager[tuple[RBToken, RBToken]]: + ''' + Helper to use `open_ringbuf_pair_sync` inside an async with block. + + ''' + with open_ringbuf_pair_sync( + shm_name, + buf_size=buf_size + ) as tokens: + yield tokens Buffer = bytes | bytearray | memoryview @@ -640,6 +755,29 @@ class RingBufferReceiveChannel(trio.abc.ReceiveChannel[bytes]): return self +async def _maybe_obtain_shared_resources(token: RBToken): + token = RBToken.from_msg(token) + + # maybe token wasn't allocated by current actor + if token.owner != current_actor().name: + # use fdshare module to retrieve a copy of the FDs + fds = await request_fds_from( + token.owner, + token.shm_name + ) + write, wrap, eof = fds + # rebuild token using FDs copies + token = RBToken( + owner=token.owner, + shm_name=token.shm_name, + write_eventfd=write, + wrap_eventfd=wrap, + eof_eventfd=eof, + buf_size=token.buf_size + ) + + return token + @acm async def attach_to_ringbuf_receiver( @@ -651,8 +789,13 @@ async def attach_to_ringbuf_receiver( Attach a RingBufferReceiveChannel from a previously opened RBToken. + Requires tractor runtime to be up in order to support opening a ringbuf + originally allocated by a different actor. + Launches `receiver._eof_monitor_task` in a `trio.Nursery`. ''' + token = await _maybe_obtain_shared_resources(token) + async with ( trio.open_nursery(strict_exception_groups=False) as n, RingBufferReceiveChannel( @@ -676,7 +819,12 @@ async def attach_to_ringbuf_sender( Attach a RingBufferSendChannel from a previously opened RBToken. + Requires tractor runtime to be up in order to support opening a ringbuf + originally allocated by a different actor. + ''' + token = await _maybe_obtain_shared_resources(token) + async with RingBufferSendChannel( token, batch_size=batch_size, diff --git a/tractor/linux/__init__.py b/tractor/linux/__init__.py index 211a0040..33526d14 100644 --- a/tractor/linux/__init__.py +++ b/tractor/linux/__init__.py @@ -13,7 +13,3 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from ._fdshare import ( - send_fds as send_fds, - recv_fds as recv_fds -) diff --git a/tractor/linux/_fdshare.py b/tractor/linux/_fdshare.py index 16a748b8..bc2385ef 100644 --- a/tractor/linux/_fdshare.py +++ b/tractor/linux/_fdshare.py @@ -21,13 +21,19 @@ https://github.com/python/cpython/blob/275056a7fdcbe36aaac494b4183ae59943a338eb/ ''' import os import array +import tempfile +from pathlib import Path from typing import AsyncContextManager from contextlib import asynccontextmanager as acm import trio +import tractor from trio import socket +log = tractor.log.get_logger(__name__) + + class FDSharingError(Exception): ... @@ -157,3 +163,126 @@ async def recv_fds(sock_path: str, amount: int) -> tuple: ) return tuple(a) + + +''' +Share FD actor module + +Add "tractor.linux._fdshare" to enabled modules on actors to allow sharing of +FDs with other actors. + +Use `share_fds` function to register a set of fds with a name, then other +actors can use `request_fds_from` function to retrieve the fds. + +Use `unshare_fds` to disable sharing of a set of FDs. + +''' + +FDType = tuple[int] + +_fds: dict[str, FDType] = {} + + +def maybe_get_fds(name: str) -> FDType | None: + ''' + Get registered FDs with a given name or return None + + ''' + return _fds.get(name, None) + + +def get_fds(name: str) -> FDType: + ''' + Get registered FDs with a given name or raise + ''' + fds = maybe_get_fds(name) + + if not fds: + raise RuntimeError(f'No FDs with name {name} found!') + + return fds + + +def share_fds( + name: str, + fds: tuple[int], +) -> None: + ''' + Register a set of fds to be shared under a given name. + + ''' + maybe_fds = maybe_get_fds(name) + if maybe_fds: + raise RuntimeError(f'share FDs: {maybe_fds} already tied to name {name}') + + _fds[name] = fds + + +def unshare_fds(name: str) -> None: + ''' + Unregister a set of fds to disable sharing them. + + ''' + get_fds(name) # raise if not exists + + del _fds[name] + + +@tractor.context +async def _pass_fds( + ctx: tractor.Context, + name: str, + sock_path: str +) -> None: + ''' + Endpoint to request a set of FDs from current actor, will use `ctx.started` + to send original FDs, then `send_fds` will block until remote side finishes + the `recv_fds` call. + + ''' + # get fds or raise error + fds = get_fds(name) + + # start fd passing context using socket on `sock_path` + async with send_fds(fds, sock_path): + # send original fds through ctx.started + await ctx.started(fds) + + +async def request_fds_from( + actor_name: str, + fds_name: str +) -> FDType: + ''' + Use this function to retreive shared FDs from `actor_name`. + + ''' + this_actor = tractor.current_actor() + + # create a temporary path for the UDS sock + sock_path = str( + Path(tempfile.gettempdir()) + / + f'{fds_name}-from-{actor_name}-to-{this_actor.name}.sock' + ) + + async with ( + tractor.find_actor(actor_name) as portal, + + portal.open_context( + _pass_fds, + name=fds_name, + sock_path=sock_path + ) as (ctx, fds_info), + ): + # get original FDs + og_fds = fds_info + + # retrieve copies of FDs + fds = await recv_fds(sock_path, len(og_fds)) + + log.info( + f'{this_actor.name} received fds: {og_fds} -> {fds}' + ) + + return fds