Adapt ringbuf pubsub to new RBToken owner system

one_ring_to_rule_them_all
Guillermo Rodriguez 2025-04-13 13:36:02 -03:00
parent 39dccbdde7
commit f5513ba005
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
4 changed files with 336 additions and 849 deletions

View File

@ -0,0 +1,182 @@
from typing import AsyncContextManager
from contextlib import asynccontextmanager as acm
import trio
import pytest
import tractor
from tractor.trionics import gather_contexts
from tractor.ipc._ringbuf import open_ringbufs
from tractor.ipc._ringbuf._pubsub import (
open_ringbuf_publisher,
open_ringbuf_subscriber,
get_publisher,
get_subscriber,
open_pub_channel_at,
open_sub_channel_at
)
log = tractor.log.get_console_log(level='info')
@tractor.context
async def publish_range(
ctx: tractor.Context,
size: int
):
pub = get_publisher()
await ctx.started()
for i in range(size):
await pub.send(i.to_bytes(4))
log.info(f'sent {i}')
await pub.flush()
log.info('range done')
@tractor.context
async def subscribe_range(
ctx: tractor.Context,
size: int
):
sub = get_subscriber()
await ctx.started()
for i in range(size):
recv = int.from_bytes(await sub.receive())
if recv != i:
raise AssertionError(
f'received: {recv} expected: {i}'
)
log.info(f'received: {recv}')
log.info('range done')
@tractor.context
async def subscriber_child(ctx: tractor.Context):
try:
async with open_ringbuf_subscriber(guarantee_order=True):
await ctx.started()
await trio.sleep_forever()
finally:
log.info('subscriber exit')
@tractor.context
async def publisher_child(
ctx: tractor.Context,
batch_size: int
):
try:
async with open_ringbuf_publisher(
guarantee_order=True,
batch_size=batch_size
):
await ctx.started()
await trio.sleep_forever()
finally:
log.info('publisher exit')
@acm
async def open_pubsub_test_actors(
ring_names: list[str],
size: int,
batch_size: int
) -> AsyncContextManager[tuple[tractor.Portal, tractor.Portal]]:
with trio.fail_after(5):
async with tractor.open_nursery(
enable_modules=[
'tractor.linux._fdshare'
]
) as an:
modules = [
__name__,
'tractor.linux._fdshare',
'tractor.ipc._ringbuf._pubsub'
]
sub_portal = await an.start_actor(
'sub',
enable_modules=modules
)
pub_portal = await an.start_actor(
'pub',
enable_modules=modules
)
async with (
sub_portal.open_context(subscriber_child) as (long_rctx, _),
pub_portal.open_context(
publisher_child,
batch_size=batch_size
) as (long_sctx, _),
):
with open_ringbufs(ring_names) as tokens:
async with (
gather_contexts([
open_sub_channel_at('sub', ring)
for ring in tokens
]),
gather_contexts([
open_pub_channel_at('pub', ring)
for ring in tokens
]),
sub_portal.open_context(subscribe_range, size=size) as (rctx, _),
pub_portal.open_context(publish_range, size=size) as (sctx, _)
):
yield
await long_sctx.cancel()
await long_rctx.cancel()
await an.cancel()
@pytest.mark.parametrize(
('ring_names', 'size', 'batch_size'),
[
(
['ring-first'],
100,
1
),
(
['ring-first'],
69,
1
),
(
[f'multi-ring-{i}' for i in range(3)],
1000,
100
),
],
ids=[
'simple',
'redo-simple',
'multi-ring',
]
)
def test_pubsub(
request,
ring_names: list[str],
size: int,
batch_size: int
):
async def main():
async with open_pubsub_test_actors(
ring_names, size, batch_size
):
...
trio.run(main)

View File

@ -1,272 +0,0 @@
import trio
import tractor
import msgspec
from tractor.ipc import (
attach_to_ringbuf_receiver,
attach_to_ringbuf_sender
)
from tractor.ipc._ringbuf._pubsub import (
open_ringbuf_publisher,
open_ringbuf_subscriber
)
import tractor.ipc._ringbuf._ringd as ringd
log = tractor.log.get_console_log(level='info')
@tractor.context
async def recv_child(
ctx: tractor.Context,
ring_name: str
):
async with (
ringd.open_ringbuf(ring_name) as token,
attach_to_ringbuf_receiver(token) as chan,
):
await ctx.started()
async for msg in chan:
log.info(f'received {int.from_bytes(msg)}')
@tractor.context
async def send_child(
ctx: tractor.Context,
ring_name: str
):
async with (
ringd.attach_ringbuf(ring_name) as token,
attach_to_ringbuf_sender(token) as chan,
):
await ctx.started()
for i in range(100):
await chan.send(i.to_bytes(4))
log.info(f'sent {i}')
def test_ringd():
'''
Spawn ringd actor and two childs that access same ringbuf through ringd.
Both will use `ringd.open_ringbuf` to allocate the ringbuf, then attach to
them as sender and receiver.
'''
async def main():
async with (
tractor.open_nursery() as an,
ringd.open_ringd()
):
recv_portal = await an.start_actor(
'recv',
enable_modules=[__name__]
)
send_portal = await an.start_actor(
'send',
enable_modules=[__name__]
)
async with (
recv_portal.open_context(
recv_child,
ring_name='ring'
) as (rctx, _),
send_portal.open_context(
send_child,
ring_name='ring'
) as (sctx, _),
):
...
await an.cancel()
trio.run(main)
class Struct(msgspec.Struct):
def encode(self) -> bytes:
return msgspec.msgpack.encode(self)
class AddChannelMsg(Struct, frozen=True, tag=True):
name: str
class RemoveChannelMsg(Struct, frozen=True, tag=True):
name: str
class RangeMsg(Struct, frozen=True, tag=True):
size: int
ControlMessages = AddChannelMsg | RemoveChannelMsg | RangeMsg
@tractor.context
async def subscriber_child(ctx: tractor.Context):
await ctx.started()
async with (
open_ringbuf_subscriber(guarantee_order=True) as subs,
trio.open_nursery() as n,
ctx.open_stream() as stream
):
range_msg = None
range_event = trio.Event()
range_scope = trio.CancelScope()
async def _control_listen_task():
nonlocal range_msg, range_event
async for msg in stream:
msg = msgspec.msgpack.decode(msg, type=ControlMessages)
match msg:
case AddChannelMsg():
await subs.add_channel(msg.name)
case RemoveChannelMsg():
subs.remove_channel(msg.name)
case RangeMsg():
range_msg = msg
range_event.set()
await stream.send(b'ack')
range_scope.cancel()
n.start_soon(_control_listen_task)
with range_scope:
while True:
await range_event.wait()
range_event = trio.Event()
for i in range(range_msg.size):
recv = int.from_bytes(await subs.receive())
if recv != i:
raise AssertionError(
f'received: {recv} expected: {i}'
)
log.info(f'received: {recv}')
await stream.send(b'valid range')
log.info('finished range')
log.info('subscriber exit')
@tractor.context
async def publisher_child(ctx: tractor.Context):
await ctx.started()
async with (
open_ringbuf_publisher(guarantee_order=True) as pub,
ctx.open_stream() as stream
):
async for msg in stream:
msg = msgspec.msgpack.decode(msg, type=ControlMessages)
match msg:
case AddChannelMsg():
await pub.add_channel(msg.name, must_exist=True)
case RemoveChannelMsg():
pub.remove_channel(msg.name)
case RangeMsg():
for i in range(msg.size):
await pub.send(i.to_bytes(4))
log.info(f'sent {i}')
await stream.send(b'ack')
log.info('publisher exit')
def test_pubsub():
'''
Spawn to childs a publisher and a subscriber, use context streams
to dynamically test different scenarios with different channel
configurations between them.
'''
async def main():
async with (
tractor.open_nursery(
loglevel='info',
# debug_mode=True,
# enable_stack_on_sig=True
) as an,
ringd.open_ringd()
):
recv_portal = await an.start_actor(
'recv',
enable_modules=[__name__]
)
send_portal = await an.start_actor(
'send',
enable_modules=[__name__]
)
async with (
recv_portal.open_context(subscriber_child) as (rctx, _),
rctx.open_stream() as recv_stream,
send_portal.open_context(publisher_child) as (sctx, _),
sctx.open_stream() as send_stream,
):
async def send_wait_ack(msg: bytes):
await recv_stream.send(msg)
ack = await recv_stream.receive()
assert ack == b'ack'
await send_stream.send(msg)
ack = await send_stream.receive()
assert ack == b'ack'
async def add_channel(name: str):
await send_wait_ack(AddChannelMsg(name=name).encode())
async def remove_channel(name: str):
await send_wait_ack(RemoveChannelMsg(name=name).encode())
async def send_range(size: int):
await send_wait_ack(RangeMsg(size=size).encode())
range_ack = await recv_stream.receive()
assert range_ack == b'valid range'
# simple test, open one channel and send 0..100 range
ring_name = 'ring-first'
await add_channel(ring_name)
await send_range(100)
await remove_channel(ring_name)
# redo
ring_name = 'ring-redo'
await add_channel(ring_name)
await send_range(100)
await remove_channel(ring_name)
# multi chan test
ring_names = []
for i in range(3):
ring_names.append(f'multi-ring-{i}')
for name in ring_names:
await add_channel(name)
await send_range(1000)
for name in ring_names:
await remove_channel(name)
await an.cancel()
trio.run(main)

View File

@ -31,7 +31,8 @@ from dataclasses import dataclass
import trio
import tractor
from tractor.ipc import (
from tractor.ipc._ringbuf import (
RBToken,
RingBufferSendChannel,
RingBufferReceiveChannel,
attach_to_ringbuf_sender,
@ -42,7 +43,8 @@ from tractor.trionics import (
order_send_channel,
order_receive_channel
)
import tractor.ipc._ringbuf._ringd as ringd
import tractor.linux._fdshare as fdshare
log = tractor.log.get_logger(__name__)
@ -53,9 +55,10 @@ ChannelType = TypeVar('ChannelType')
@dataclass
class ChannelInfo:
name: str
token: RBToken
channel: ChannelType
cancel_scope: trio.CancelScope
teardown: trio.Event
class ChannelManager(Generic[ChannelType]):
@ -88,8 +91,6 @@ class ChannelManager(Generic[ChannelType]):
self._is_closed: bool = True
self._teardown = trio.Event()
@property
def closed(self) -> bool:
return self._is_closed
@ -100,9 +101,9 @@ class ChannelManager(Generic[ChannelType]):
async def _channel_handler_task(
self,
name: str,
must_exist: bool = False,
token: RBToken,
task_status=trio.TASK_STATUS_IGNORED,
**kwargs
):
'''
Open channel resources, add to internal data structures, signal channel
@ -114,12 +115,16 @@ class ChannelManager(Generic[ChannelType]):
kwargs are proxied to `self._open_channel` acm.
'''
async with self._open_channel(name, must_exist=must_exist) as chan:
async with self._open_channel(
token,
**kwargs
) as chan:
cancel_scope = trio.CancelScope()
info = ChannelInfo(
name=name,
token=token,
channel=chan,
cancel_scope=cancel_scope
cancel_scope=cancel_scope,
teardown=trio.Event()
)
self._channels.append(info)
@ -131,10 +136,7 @@ class ChannelManager(Generic[ChannelType]):
with cancel_scope:
await self._channel_task(info)
self._maybe_destroy_channel(name)
if len(self) == 0:
self._teardown.set()
self._maybe_destroy_channel(token.shm_name)
def _find_channel(self, name: str) -> tuple[int, ChannelInfo] | None:
'''
@ -145,7 +147,7 @@ class ChannelManager(Generic[ChannelType]):
'''
for entry in enumerate(self._channels):
i, info = entry
if info.name == name:
if info.token.shm_name == name:
return entry
return None
@ -161,9 +163,14 @@ class ChannelManager(Generic[ChannelType]):
if maybe_entry:
i, info = maybe_entry
info.cancel_scope.cancel()
info.teardown.set()
del self._channels[i]
async def add_channel(self, name: str, must_exist: bool = False):
async def add_channel(
self,
token: RBToken,
**kwargs
):
'''
Add a new channel to be handled
@ -173,11 +180,11 @@ class ChannelManager(Generic[ChannelType]):
await self._n.start(partial(
self._channel_handler_task,
name,
must_exist=must_exist
RBToken.from_msg(token),
**kwargs
))
def remove_channel(self, name: str):
async def remove_channel(self, name: str):
'''
Remove a channel and stop its handling
@ -185,8 +192,18 @@ class ChannelManager(Generic[ChannelType]):
if self.closed:
raise trio.ClosedResourceError
maybe_entry = self._find_channel(name)
if not maybe_entry:
# return
raise RuntimeError(
f'tried to remove channel {name} but if does not exist'
)
i, info = maybe_entry
self._maybe_destroy_channel(name)
await info.teardown.wait()
# if that was last channel reset connect event
if len(self) == 0:
self._connect_event = trio.Event()
@ -225,15 +242,7 @@ class ChannelManager(Generic[ChannelType]):
if info.channel.closed:
continue
self.remove_channel(info.name)
try:
await self._teardown.wait()
except trio.Cancelled:
# log.exception('close was cancelled')
raise
await self.remove_channel(info.name)
self._is_closed = True
@ -257,16 +266,12 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]):
self,
n: trio.Nursery,
# new ringbufs created will have this buf_size
buf_size: int = 10 * 1024,
# amount of msgs to each ring before switching turns
msgs_per_turn: int = 1,
# global batch size for all channels
batch_size: int = 1
):
self._buf_size = buf_size
self._batch_size: int = batch_size
self.msgs_per_turn = msgs_per_turn
@ -331,63 +336,32 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]):
async def add_channel(
self,
name: str,
must_exist: bool = False
token: RBToken,
):
await self._chanmngr.add_channel(name, must_exist=must_exist)
await self._chanmngr.add_channel(token)
def remove_channel(self, name: str):
self._chanmngr.remove_channel(name)
async def remove_channel(self, name: str):
await self._chanmngr.remove_channel(name)
@acm
async def _open_channel(
self,
name: str,
must_exist: bool = False
token: RBToken
) -> AsyncContextManager[RingBufferSendChannel]:
'''
Open a ringbuf through `ringd` and attach as send side
'''
if must_exist:
ringd_fn = ringd.attach_ringbuf
kwargs = {}
else:
ringd_fn = ringd.open_ringbuf
kwargs = {'buf_size': self._buf_size}
async with (
ringd_fn(
name=name,
**kwargs
) as token,
attach_to_ringbuf_sender(
token,
batch_size=self._batch_size
) as ring,
):
async with attach_to_ringbuf_sender(
token,
batch_size=self._batch_size
) as ring:
yield ring
# try:
# # ensure all messages are sent
# await ring.flush()
# except Exception as e:
# e.add_note(f'while closing ringbuf send channel {name}')
# log.exception(e)
async def _channel_task(self, info: ChannelInfo) -> None:
'''
Wait forever until channel cancellation
'''
try:
await trio.sleep_forever()
except trio.Cancelled:
...
await trio.sleep_forever()
async def send(self, msg: bytes):
'''
@ -441,8 +415,7 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]):
log.warning('tried to close RingBufferPublisher but its already closed...')
return
with trio.CancelScope(shield=True):
await self._chanmngr.close()
await self._chanmngr.close()
self._is_closed = True
@ -467,15 +440,10 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]):
self,
n: trio.Nursery,
# new ringbufs created will have this buf_size
buf_size: int = 10 * 1024,
# if connecting to a publisher that has already sent messages set
# to the next expected payload index this subscriber will receive
start_index: int = 0
):
self._buf_size = buf_size
self._chanmngr = ChannelManager[RingBufferReceiveChannel](
n,
self._open_channel,
@ -499,40 +467,24 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]):
def get_channel(self, name: str):
return self._chanmngr[name]
async def add_channel(self, name: str, must_exist: bool = False):
await self._chanmngr.add_channel(name, must_exist=must_exist)
async def add_channel(
self,
token: RBToken
):
await self._chanmngr.add_channel(token)
def remove_channel(self, name: str):
self._chanmngr.remove_channel(name)
async def remove_channel(self, name: str):
await self._chanmngr.remove_channel(name)
@acm
async def _open_channel(
self,
name: str,
must_exist: bool = False
token: RBToken
) -> AsyncContextManager[RingBufferReceiveChannel]:
'''
Open a ringbuf through `ringd` and attach as receiver side
'''
if must_exist:
ringd_fn = ringd.attach_ringbuf
kwargs = {}
else:
ringd_fn = ringd.open_ringbuf
kwargs = {'buf_size': self._buf_size}
async with (
ringd_fn(
name=name,
**kwargs
) as token,
attach_to_ringbuf_receiver(token) as chan
):
yield chan
) -> AsyncContextManager[RingBufferSendChannel]:
async with attach_to_ringbuf_receiver(token) as ring:
yield ring
async def _channel_task(self, info: ChannelInfo) -> None:
'''
@ -582,6 +534,7 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]):
await self._chanmngr.close()
await self._schan.aclose()
await self._rchan.aclose()
self._is_closed = True
@ -641,89 +594,128 @@ def get_subscriber() -> RingBufferSubscriber:
@tractor.context
async def open_pub_channel(
async def _add_pub_channel(
ctx: tractor.Context,
ring_name: str,
must_exist: bool = False
token: RBToken
):
publisher = get_publisher()
await publisher.add_channel(
ring_name,
must_exist=must_exist
)
await ctx.started()
await publisher.add_channel(token)
try:
await trio.sleep_forever()
finally:
try:
publisher.remove_channel(ring_name)
except trio.ClosedResourceError:
...
@tractor.context
async def _remove_pub_channel(
ctx: tractor.Context,
ring_name: str
):
publisher = get_publisher()
await ctx.started()
maybe_token = fdshare.maybe_get_fds(ring_name)
if maybe_token:
await publisher.remove_channel(ring_name)
@acm
async def open_pub_channel_at(
actor_name: str,
ring_name: str,
must_exist: bool = False
token: RBToken,
cleanup: bool = True,
):
async with (
tractor.find_actor(actor_name) as portal,
portal.open_context(
open_pub_channel,
ring_name=ring_name,
must_exist=must_exist
_add_pub_channel,
token=token
) as (ctx, _)
):
...
try:
yield
await ctx.cancel()
except trio.Cancelled:
log.exception(
'open_pub_channel_at got cancelled!\n'
f'\tactor_name = {actor_name}\n'
f'\ttoken = {token}\n'
)
raise
finally:
if not cleanup:
return
async with tractor.find_actor(actor_name) as portal:
if portal:
async with portal.open_context(
_remove_pub_channel,
ring_name=token.shm_name
) as (ctx, _):
...
@tractor.context
async def open_sub_channel(
async def _add_sub_channel(
ctx: tractor.Context,
ring_name: str,
must_exist: bool = False
token: RBToken
):
subscriber = get_subscriber()
await subscriber.add_channel(
ring_name,
must_exist=must_exist
)
await ctx.started()
await subscriber.add_channel(token)
try:
await trio.sleep_forever()
finally:
try:
subscriber.remove_channel(ring_name)
except trio.ClosedResourceError:
...
@tractor.context
async def _remove_sub_channel(
ctx: tractor.Context,
ring_name: str
):
subscriber = get_subscriber()
await ctx.started()
maybe_token = fdshare.maybe_get_fds(ring_name)
if maybe_token:
await subscriber.remove_channel(ring_name)
@acm
async def open_sub_channel_at(
actor_name: str,
ring_name: str,
must_exist: bool = False
token: RBToken,
cleanup: bool = True,
):
async with (
tractor.find_actor(actor_name) as portal,
portal.open_context(
open_sub_channel,
ring_name=ring_name,
must_exist=must_exist
_add_sub_channel,
token=token
) as (ctx, _)
):
...
try:
yield
await ctx.cancel()
except trio.Cancelled:
log.exception(
'open_sub_channel_at got cancelled!\n'
f'\tactor_name = {actor_name}\n'
f'\ttoken = {token}\n'
)
raise
finally:
if not cleanup:
return
async with tractor.find_actor(actor_name) as portal:
if portal:
async with portal.open_context(
_remove_sub_channel,
ring_name=token.shm_name
) as (ctx, _):
...
'''
@ -733,9 +725,6 @@ High level helpers to open publisher & subscriber
@acm
async def open_ringbuf_publisher(
# buf size for created rings
buf_size: int = 10 * 1024,
# global batch size for channels
batch_size: int = 1,
@ -747,9 +736,6 @@ async def open_ringbuf_publisher(
# index
guarantee_order: bool = False,
# explicit nursery cancel call on cleanup
force_cancel: bool = False,
# on creation, set the `_publisher` global in order to use the provided
# tractor.context & helper utils for adding and removing new channels from
# remote actors
@ -764,7 +750,6 @@ async def open_ringbuf_publisher(
trio.open_nursery(strict_exception_groups=False) as n,
RingBufferPublisher(
n,
buf_size=buf_size,
batch_size=batch_size
) as publisher
):
@ -777,23 +762,17 @@ async def open_ringbuf_publisher(
try:
yield publisher
finally:
if force_cancel:
# implicitly cancel any running channel handler task
n.cancel_scope.cancel()
except trio.Cancelled:
with trio.CancelScope(shield=True):
await publisher.aclose()
raise
@acm
async def open_ringbuf_subscriber(
# buf size for created rings
buf_size: int = 10 * 1024,
# expect indexed payloads and unwrap them in order
guarantee_order: bool = False,
# explicit nursery cancel call on cleanup
force_cancel: bool = False,
# on creation, set the `_subscriber` global in order to use the provided
# tractor.context & helper utils for adding and removing new channels from
# remote actors
@ -805,10 +784,7 @@ async def open_ringbuf_subscriber(
'''
async with (
trio.open_nursery(strict_exception_groups=False) as n,
RingBufferSubscriber(
n,
buf_size=buf_size
) as subscriber
RingBufferSubscriber(n) as subscriber
):
# maybe monkey patch `.receive` to use indexed payloads
if guarantee_order:
@ -819,8 +795,10 @@ async def open_ringbuf_subscriber(
global _subscriber
set_subscriber(subscriber)
yield subscriber
try:
yield subscriber
if force_cancel:
# implicitly cancel any running channel handler task
n.cancel_scope.cancel()
except trio.Cancelled:
with trio.CancelScope(shield=True):
await subscriber.aclose()
raise

View File

@ -1,401 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Actor to broker ringbuf resources, creates and allocates
the resources, then automatically does fd passing.
call open_ringd in your root actor
then on actors that need a ringbuf resource use
open_ringbuf acm, will automatically contact ringd.
'''
import os
import tempfile
from typing import AsyncContextManager
from pathlib import Path
from contextlib import (
asynccontextmanager as acm
)
from dataclasses import dataclass
import trio
import tractor
from tractor.linux import (
send_fds,
recv_fds,
)
import tractor.ipc._ringbuf as ringbuf
from tractor.ipc._ringbuf import RBToken
log = tractor.log.get_logger(__name__)
'''
Daemon implementation
'''
_ringd_actor_name: str = 'ringd'
_root_name: str = f'{_ringd_actor_name}-{os.getpid()}'
def _make_ring_name(name: str) -> str:
'''
User provided ring names will be prefixed by the ringd actor name and pid.
'''
return f'{_root_name}.{name}'
@dataclass
class RingInfo:
token: RBToken
creator: str
_rings: dict[str, RingInfo] = {}
_ring_lock = trio.StrictFIFOLock()
def _maybe_get_ring(name: str) -> RingInfo | None:
'''
Maybe return RingInfo for a given name str
'''
# if full name was passed, strip root name
if _root_name in name:
name = name.replace(f'{_root_name}.', '')
return _rings.get(name, None)
def _get_ring(name: str) -> RingInfo:
'''
Return a RingInfo for a given name or raise
'''
info = _maybe_get_ring(name)
if not info:
raise RuntimeError(f'Ring \"{name}\" not found!')
return info
def _insert_ring(name: str, info: RingInfo):
'''
Add a new ring
'''
if name in _rings:
raise RuntimeError(f'A ring with name {name} already exists!')
_rings[name] = info
def _destroy_ring(name: str):
'''
Delete information about a ring
'''
if name not in _rings:
raise RuntimeError(f'Tried to delete non existant {name} ring!')
del _rings[name]
@tractor.context
async def _pass_fds(
ctx: tractor.Context,
name: str,
sock_path: str
):
'''
Ringd endpoint to request passing fds of a ring.
Supports passing fullname or not (ringd actor name and pid before ring
name).
See `_attach_to_ring` function for usage.
'''
async with _ring_lock:
# get ring fds or raise error
token = _get_ring(name).token
# start fd passing context using socket on `sock_path`
async with send_fds(token.fds, sock_path):
log.info(f'connected to {sock_path} for fd passing')
# use started to signal socket is ready and send token in order for
# client to get extra info like buf_size
await ctx.started(token)
# send_fds will block until receive side acks
log.info(f'ring {name} fds: {token.fds}, sent')
@tractor.context
async def _open_ringbuf(
ctx: tractor.Context,
caller: str,
name: str,
buf_size: int = 10 * 1024
):
'''
Ringd endpoint to create and allocate resources for a new ring.
'''
await _ring_lock.acquire()
maybe_info = _maybe_get_ring(name)
if maybe_info:
raise RuntimeError(
f'Tried to create ringbuf but it already exists: {name}'
)
fullname = _make_ring_name(name)
with ringbuf.open_ringbuf(
fullname,
buf_size=buf_size
) as token:
_insert_ring(
name,
RingInfo(
token=token,
creator=caller,
)
)
_ring_lock.release()
# yield full ring name to rebuild token after fd passing
await ctx.started(fullname)
# await ctx cancel to remove ring from tracking and cleanup
try:
log.info(f'ring {name} created by {caller}')
await trio.sleep_forever()
finally:
_destroy_ring(name)
log.info(f'ring {name} destroyed by {caller}')
@tractor.context
async def _attach_ringbuf(
ctx: tractor.Context,
caller: str,
name: str
) -> str:
'''
Ringd endpoint to "attach" to an existing ring, this just ensures ring
actually exists and returns its full name.
'''
async with _ring_lock:
info = _maybe_get_ring(name)
if not info:
raise RuntimeError(
f'{caller} tried to open_ringbuf but it doesn\'t exist: {name}'
)
await ctx.started()
# return full ring name to rebuild token after fd passing
return info.token.shm_name
@tractor.context
async def _maybe_open_ringbuf(
ctx: tractor.Context,
caller: str,
name: str,
buf_size: int = 10 * 1024,
):
'''
If ring already exists attach, if not create it.
'''
maybe_info = _maybe_get_ring(name)
if maybe_info:
return await _attach_ringbuf(ctx, caller, name)
return await _open_ringbuf(ctx, caller, name, buf_size=buf_size)
'''
Ringd client side helpers
'''
@acm
async def open_ringd(**kwargs) -> tractor.Portal:
'''
Spawn new ringd actor.
'''
async with tractor.open_nursery(**kwargs) as an:
portal = await an.start_actor(
_ringd_actor_name,
enable_modules=[__name__]
)
yield portal
await an.cancel()
@acm
async def wait_for_ringd() -> tractor.Portal:
'''
Wait for ringd actor to be up.
'''
async with tractor.wait_for_actor(
_ringd_actor_name
) as portal:
yield portal
async def _request_ring_fds(
fullname: str
) -> RBToken:
'''
Private helper to fetch ring fds from ringd actor.
'''
actor = tractor.current_actor()
fd_amount = 3
sock_path = str(
Path(tempfile.gettempdir())
/
f'{fullname}-to-{actor.name}.sock'
)
log.info(f'trying to attach to {fullname}...')
async with (
tractor.find_actor(_ringd_actor_name) as ringd,
ringd.open_context(
_pass_fds,
name=fullname,
sock_path=sock_path
) as (ctx, token),
):
fds = await recv_fds(sock_path, fd_amount)
write, wrap, eof = fds
log.info(
f'received fds, write: {write}, wrap: {wrap}, eof: {eof}'
)
token = RBToken.from_msg(token)
return RBToken(
shm_name=fullname,
write_eventfd=write,
wrap_eventfd=wrap,
eof_eventfd=eof,
buf_size=token.buf_size
)
@acm
async def open_ringbuf(
name: str,
buf_size: int = 10 * 1024,
) -> AsyncContextManager[RBToken]:
'''
Create a new ring and retrieve its fds.
'''
actor = tractor.current_actor()
async with (
wait_for_ringd() as ringd,
ringd.open_context(
_open_ringbuf,
caller=actor.name,
name=name,
buf_size=buf_size,
) as (ctx, fullname),
):
token = await _request_ring_fds(fullname)
log.info(f'{actor.name} opened {token}')
try:
yield token
finally:
with trio.CancelScope(shield=True):
await ctx.cancel()
@acm
async def attach_ringbuf(
name: str,
) -> AsyncContextManager[RBToken]:
'''
Attach to an existing ring and retreive its fds.
'''
actor = tractor.current_actor()
async with (
wait_for_ringd() as ringd,
ringd.open_context(
_attach_ringbuf,
caller=actor.name,
name=name,
) as (ctx, _),
):
fullname = await ctx.wait_for_result()
token = await _request_ring_fds(fullname)
log.info(f'{actor.name} attached {token}')
yield token
@acm
async def maybe_open_ringbuf(
name: str,
buf_size: int = 10 * 1024,
) -> AsyncContextManager[RBToken]:
'''
Attach or create a ring and retreive its fds.
'''
actor = tractor.current_actor()
async with (
wait_for_ringd() as ringd,
ringd.open_context(
_maybe_open_ringbuf,
caller=actor.name,
name=name,
buf_size=buf_size,
) as (ctx, fullname),
):
token = await _request_ring_fds(fullname)
log.info(f'{actor.name} opened {token}')
try:
yield token
finally:
with trio.CancelScope(shield=True):
await ctx.cancel()