diff --git a/default.nix b/default.nix new file mode 100644 index 00000000..31615def --- /dev/null +++ b/default.nix @@ -0,0 +1,18 @@ +{ pkgs ? import {} }: +let + nativeBuildInputs = with pkgs; [ + stdenv.cc.cc.lib + uv + ]; + +in +pkgs.mkShell { + inherit nativeBuildInputs; + + LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath nativeBuildInputs; + + shellHook = '' + set -e + uv venv .venv --python=3.11 + ''; +} diff --git a/pyproject.toml b/pyproject.toml index b3e9e100..fd67bff2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "pdbp>=1.6,<2", # windows only (from `pdbp`) # typed IPC msging "msgspec>=0.19.0", + "cffi>=1.17.1", ] # ------ project ------ diff --git a/tests/test_eventfd.py b/tests/test_eventfd.py new file mode 100644 index 00000000..3d757169 --- /dev/null +++ b/tests/test_eventfd.py @@ -0,0 +1,32 @@ +import trio +import pytest +from tractor.ipc import ( + open_eventfd, + EFDReadCancelled, + EventFD +) + + +def test_eventfd_read_cancellation(): + ''' + Ensure EventFD.read raises EFDReadCancelled if EventFD.close() + is called. + + ''' + fd = open_eventfd() + + async def _read(event: EventFD): + with pytest.raises(EFDReadCancelled): + await event.read() + + async def main(): + async with trio.open_nursery() as n: + with ( + EventFD(fd, 'w') as event, + trio.fail_after(3) + ): + n.start_soon(_read, event) + await trio.sleep(0.2) + event.close() + + trio.run(main) diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py new file mode 100644 index 00000000..8858215e --- /dev/null +++ b/tests/test_ringbuf.py @@ -0,0 +1,423 @@ +import time +import hashlib + +import trio +import pytest +import tractor +from tractor.ipc import ( + open_ringbuf, + attach_to_ringbuf_receiver, + attach_to_ringbuf_sender, + attach_to_ringbuf_stream, + attach_to_ringbuf_channel, + RBToken, +) +from tractor._testing.samples import ( + generate_single_byte_msgs, + generate_sample_messages +) + + +@tractor.context +async def child_read_shm( + ctx: tractor.Context, + msg_amount: int, + token: RBToken, +) -> str: + ''' + Sub-actor used in `test_ringbuf`. + + Attach to a ringbuf and receive all messages until end of stream. + Keep track of how many bytes received and also calculate + sha256 of the whole byte stream. + + Calculate and print performance stats, finally return calculated + hash. + + ''' + await ctx.started() + print('reader started') + recvd_bytes = 0 + recvd_hash = hashlib.sha256() + start_ts = time.time() + async with attach_to_ringbuf_receiver(token) as receiver: + async for msg in receiver: + recvd_hash.update(msg) + recvd_bytes += len(msg) + + end_ts = time.time() + elapsed = end_ts - start_ts + elapsed_ms = int(elapsed * 1000) + + print(f'\n\telapsed ms: {elapsed_ms}') + print(f'\tmsg/sec: {int(msg_amount / elapsed):,}') + print(f'\tbytes/sec: {int(recvd_bytes / elapsed):,}') + print(f'\treceived bytes: {recvd_bytes:,}') + + return recvd_hash.hexdigest() + + +@tractor.context +async def child_write_shm( + ctx: tractor.Context, + msg_amount: int, + rand_min: int, + rand_max: int, + token: RBToken, +) -> None: + ''' + Sub-actor used in `test_ringbuf` + + Generate `msg_amount` payloads with + `random.randint(rand_min, rand_max)` random bytes at the end, + Calculate sha256 hash and send it to parent on `ctx.started`. + + Attach to ringbuf and send all generated messages. + + ''' + msgs, _total_bytes = generate_sample_messages( + msg_amount, + rand_min=rand_min, + rand_max=rand_max, + ) + print('writer hashing payload...') + sent_hash = hashlib.sha256(b''.join(msgs)).hexdigest() + print('writer done hashing.') + await ctx.started(sent_hash) + print('writer started') + async with attach_to_ringbuf_sender(token, cleanup=False) as sender: + for msg in msgs: + await sender.send_all(msg) + + print('writer exit') + + +@pytest.mark.parametrize( + 'msg_amount,rand_min,rand_max,buf_size', + [ + # simple case, fixed payloads, large buffer + (100_000, 0, 0, 10 * 1024), + + # guaranteed wrap around on every write + (100, 10 * 1024, 20 * 1024, 10 * 1024), + + # large payload size, but large buffer + (10_000, 256 * 1024, 512 * 1024, 10 * 1024 * 1024) + ], + ids=[ + 'fixed_payloads_large_buffer', + 'wrap_around_every_write', + 'large_payloads_large_buffer', + ] +) +def test_ringbuf( + msg_amount: int, + rand_min: int, + rand_max: int, + buf_size: int +): + ''' + - Open a new ring buf on root actor + - Open `child_write_shm` ctx in sub-actor which will generate a + random payload and send its hash on `ctx.started`, finally sending + the payload through the stream. + - Open `child_read_shm` ctx in sub-actor which will receive the + payload, calculate perf stats and return the hash. + - Compare both hashes + + ''' + async def main(): + with open_ringbuf( + 'test_ringbuf', + buf_size=buf_size + ) as token: + proc_kwargs = {'pass_fds': token.fds} + + async with tractor.open_nursery() as an: + send_p = await an.start_actor( + 'ring_sender', + enable_modules=[__name__], + proc_kwargs=proc_kwargs + ) + recv_p = await an.start_actor( + 'ring_receiver', + enable_modules=[__name__], + proc_kwargs=proc_kwargs + ) + async with ( + send_p.open_context( + child_write_shm, + token=token, + msg_amount=msg_amount, + rand_min=rand_min, + rand_max=rand_max, + ) as (_sctx, sent_hash), + recv_p.open_context( + child_read_shm, + token=token, + msg_amount=msg_amount + ) as (rctx, _sent), + ): + recvd_hash = await rctx.result() + + assert sent_hash == recvd_hash + + await send_p.cancel_actor() + await recv_p.cancel_actor() + + + trio.run(main) + + +@tractor.context +async def child_blocked_receiver( + ctx: tractor.Context, + token: RBToken +): + async with attach_to_ringbuf_receiver(token) as receiver: + await ctx.started() + await receiver.receive_some() + + +def test_reader_cancel(): + ''' + Test that a receiver blocked on eventfd(2) read responds to + cancellation. + + ''' + async def main(): + with open_ringbuf('test_ring_cancel_reader') as token: + async with ( + tractor.open_nursery() as an, + attach_to_ringbuf_sender(token) as _sender, + ): + recv_p = await an.start_actor( + 'ring_blocked_receiver', + enable_modules=[__name__], + proc_kwargs={ + 'pass_fds': token.fds + } + ) + async with ( + recv_p.open_context( + child_blocked_receiver, + token=token + ) as (sctx, _sent), + ): + await trio.sleep(1) + await an.cancel() + + + with pytest.raises(tractor._exceptions.ContextCancelled): + trio.run(main) + + +@tractor.context +async def child_blocked_sender( + ctx: tractor.Context, + token: RBToken +): + async with attach_to_ringbuf_sender(token) as sender: + await ctx.started() + await sender.send_all(b'this will wrap') + + +def test_sender_cancel(): + ''' + Test that a sender blocked on eventfd(2) read responds to + cancellation. + + ''' + async def main(): + with open_ringbuf( + 'test_ring_cancel_sender', + buf_size=1 + ) as token: + async with tractor.open_nursery() as an: + recv_p = await an.start_actor( + 'ring_blocked_sender', + enable_modules=[__name__], + proc_kwargs={ + 'pass_fds': token.fds + } + ) + async with ( + recv_p.open_context( + child_blocked_sender, + token=token + ) as (sctx, _sent), + ): + await trio.sleep(1) + await an.cancel() + + + with pytest.raises(tractor._exceptions.ContextCancelled): + trio.run(main) + + +def test_receiver_max_bytes(): + ''' + Test that RingBuffReceiver.receive_some's max_bytes optional + argument works correctly, send a msg of size 100, then + force receive of messages with max_bytes == 1, wait until + 100 of these messages are received, then compare join of + msgs with original message + + ''' + msg = generate_single_byte_msgs(100) + msgs = [] + + async def main(): + with open_ringbuf( + 'test_ringbuf_max_bytes', + buf_size=10 + ) as token: + async with ( + trio.open_nursery() as n, + attach_to_ringbuf_sender(token, cleanup=False) as sender, + attach_to_ringbuf_receiver(token, cleanup=False) as receiver + ): + async def _send_and_close(): + await sender.send_all(msg) + await sender.aclose() + + n.start_soon(_send_and_close) + while len(msgs) < len(msg): + msg_part = await receiver.receive_some(max_bytes=1) + assert len(msg_part) == 1 + msgs.append(msg_part) + + trio.run(main) + assert msg == b''.join(msgs) + + +def test_stapled_ringbuf(): + ''' + Open two ringbufs and give tokens to tasks (swap them such that in/out tokens + are inversed on each task) which will open the streams and use trio.StapledStream + to have a single bidirectional stream. + + Then take turns to send and receive messages. + + ''' + msg = generate_single_byte_msgs(100) + pair_0_msgs = [] + pair_1_msgs = [] + + pair_0_done = trio.Event() + pair_1_done = trio.Event() + + async def pair_0(token_in: RBToken, token_out: RBToken): + async with attach_to_ringbuf_stream( + token_in, + token_out, + cleanup_in=False, + cleanup_out=False + ) as stream: + # first turn to send + await stream.send_all(msg) + + # second turn to receive + while len(pair_0_msgs) != len(msg): + _msg = await stream.receive_some(max_bytes=1) + pair_0_msgs.append(_msg) + + pair_0_done.set() + await pair_1_done.wait() + + + async def pair_1(token_in: RBToken, token_out: RBToken): + async with attach_to_ringbuf_stream( + token_in, + token_out, + cleanup_in=False, + cleanup_out=False + ) as stream: + # first turn to receive + while len(pair_1_msgs) != len(msg): + _msg = await stream.receive_some(max_bytes=1) + pair_1_msgs.append(_msg) + + # second turn to send + await stream.send_all(msg) + + pair_1_done.set() + await pair_0_done.wait() + + + async def main(): + with tractor.ipc.open_ringbuf_pair( + 'test_stapled_ringbuf' + ) as (token_0, token_1): + async with trio.open_nursery() as n: + n.start_soon(pair_0, token_0, token_1) + n.start_soon(pair_1, token_1, token_0) + + + trio.run(main) + + assert msg == b''.join(pair_0_msgs) + assert msg == b''.join(pair_1_msgs) + + +@tractor.context +async def child_channel_sender( + ctx: tractor.Context, + msg_amount_min: int, + msg_amount_max: int, + token_in: RBToken, + token_out: RBToken +): + import random + msgs, _total_bytes = generate_sample_messages( + random.randint(msg_amount_min, msg_amount_max), + rand_min=256, + rand_max=1024, + ) + async with attach_to_ringbuf_channel( + token_in, + token_out + ) as chan: + await ctx.started(msgs) + + for msg in msgs: + await chan.send(msg) + + +def test_channel(): + + msg_amount_min = 100 + msg_amount_max = 1000 + + async def main(): + with tractor.ipc.open_ringbuf_pair( + 'test_ringbuf_transport' + ) as (token_0, token_1): + async with ( + attach_to_ringbuf_channel(token_0, token_1) as chan, + tractor.open_nursery() as an + ): + recv_p = await an.start_actor( + 'test_ringbuf_transport_sender', + enable_modules=[__name__], + proc_kwargs={ + 'pass_fds': token_0.fds + token_1.fds + } + ) + async with ( + recv_p.open_context( + child_channel_sender, + msg_amount_min=msg_amount_min, + msg_amount_max=msg_amount_max, + token_in=token_1, + token_out=token_0 + ) as (ctx, msgs), + ): + recv_msgs = [] + async for msg in chan: + recv_msgs.append(msg) + + await recv_p.cancel_actor() + assert recv_msgs == msgs + + trio.run(main) diff --git a/tests/test_shm.py b/tests/test_shm.py index 2b7a382f..ddeb67aa 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -8,7 +8,7 @@ import uuid import pytest import trio import tractor -from tractor._shm import ( +from tractor.ipc._shm import ( open_shm_list, attach_shm_list, ) diff --git a/tractor/__init__.py b/tractor/__init__.py index 0c011a22..6fac747f 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -64,7 +64,7 @@ from ._root import ( run_daemon as run_daemon, open_root_actor as open_root_actor, ) -from ._ipc import Channel as Channel +from .ipc import Channel as Channel from ._portal import Portal as Portal from ._runtime import Actor as Actor # from . import hilevel as hilevel diff --git a/tractor/_context.py b/tractor/_context.py index 201e920a..d93d7759 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -89,7 +89,7 @@ from .msg import ( pretty_struct, _ops as msgops, ) -from ._ipc import ( +from .ipc import ( Channel, ) from ._streaming import ( @@ -105,7 +105,7 @@ from ._state import ( if TYPE_CHECKING: from ._portal import Portal from ._runtime import Actor - from ._ipc import MsgTransport + from .ipc import MsgTransport from .devx._frame_stack import ( CallerInfo, ) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index a681c63b..1c3cbff0 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -29,7 +29,7 @@ from contextlib import asynccontextmanager as acm from tractor.log import get_logger from .trionics import gather_contexts -from ._ipc import _connect_chan, Channel +from .ipc import _connect_chan, Channel from ._portal import ( Portal, open_portal, diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index f9e18e18..8442ecfd 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -65,7 +65,7 @@ if TYPE_CHECKING: from ._context import Context from .log import StackLevelAdapter from ._stream import MsgStream - from ._ipc import Channel + from .ipc import Channel log = get_logger('tractor') diff --git a/tractor/_portal.py b/tractor/_portal.py index cee10c47..c8a781a7 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -43,7 +43,7 @@ from .trionics import maybe_open_nursery from ._state import ( current_actor, ) -from ._ipc import Channel +from .ipc import Channel from .log import get_logger from .msg import ( # Error, diff --git a/tractor/_root.py b/tractor/_root.py index 2a9beaa3..35639c15 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -43,7 +43,7 @@ from .devx import _debug from . import _spawn from . import _state from . import log -from ._ipc import _connect_chan +from .ipc import _connect_chan from ._exceptions import is_multi_cancelled diff --git a/tractor/_rpc.py b/tractor/_rpc.py index c5daed9e..6dfecd38 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -42,7 +42,7 @@ from trio import ( TaskStatus, ) -from ._ipc import Channel +from .ipc import Channel from ._context import ( Context, ) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 890a690a..2c8dbbd9 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -73,7 +73,7 @@ from tractor.msg import ( pretty_struct, types as msgtypes, ) -from ._ipc import Channel +from .ipc import Channel from ._context import ( mk_context, Context, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3159508d..dc2429d9 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -399,7 +399,8 @@ async def new_proc( *, infect_asyncio: bool = False, - task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {} ) -> None: @@ -419,6 +420,7 @@ async def new_proc( _runtime_vars, # run time vars infect_asyncio=infect_asyncio, task_status=task_status, + proc_kwargs=proc_kwargs ) @@ -434,7 +436,8 @@ async def trio_proc( _runtime_vars: dict[str, Any], # serialized and sent to _child *, infect_asyncio: bool = False, - task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {} ) -> None: ''' @@ -475,7 +478,7 @@ async def trio_proc( proc: trio.Process|None = None try: try: - proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd) + proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) log.runtime( 'Started new child\n' f'|_{proc}\n' @@ -640,7 +643,8 @@ async def mp_proc( _runtime_vars: dict[str, Any], # serialized and sent to _child *, infect_asyncio: bool = False, - task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {} ) -> None: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 2ff2d41c..21e59214 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -56,7 +56,7 @@ from tractor.msg import ( if TYPE_CHECKING: from ._runtime import Actor from ._context import Context - from ._ipc import Channel + from .ipc import Channel log = get_logger(__name__) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index bc6bc983..052a5f4c 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -141,6 +141,7 @@ class ActorNursery: # a `._ria_nursery` since the dependent APIs have been # removed! nursery: trio.Nursery|None = None, + proc_kwargs: dict[str, any] = {} ) -> Portal: ''' @@ -204,6 +205,7 @@ class ActorNursery: parent_addr, _rtv, # run time vars infect_asyncio=infect_asyncio, + proc_kwargs=proc_kwargs ) ) @@ -227,6 +229,7 @@ class ActorNursery: enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor infect_asyncio: bool = False, + proc_kwargs: dict[str, any] = {}, **kwargs, # explicit args to ``fn`` @@ -257,6 +260,7 @@ class ActorNursery: # use the run_in_actor nursery nursery=self._ria_nursery, infect_asyncio=infect_asyncio, + proc_kwargs=proc_kwargs ) # XXX: don't allow stream funcs diff --git a/tractor/_testing/samples.py b/tractor/_testing/samples.py new file mode 100644 index 00000000..4249bae9 --- /dev/null +++ b/tractor/_testing/samples.py @@ -0,0 +1,81 @@ +import os +import random + + +def generate_single_byte_msgs(amount: int) -> bytes: + ''' + Generate a byte instance of len `amount` with: + + ``` + byte_at_index(i) = (i % 10).encode() + ``` + + this results in constantly repeating sequences of: + + b'0123456789' + + ''' + return b''.join(str(i % 10).encode() for i in range(amount)) + + +def generate_sample_messages( + amount: int, + rand_min: int = 0, + rand_max: int = 0, + silent: bool = False, +) -> tuple[list[bytes], int]: + ''' + Generate bytes msgs for tests. + + Messages will have the following format: + + ``` + b'[{i:08}]' + os.urandom(random.randint(rand_min, rand_max)) + ``` + + so for message index 25: + + b'[00000025]' + random_bytes + + ''' + msgs = [] + size = 0 + + log_interval = None + if not silent: + print(f'\ngenerating {amount} messages...') + + # calculate an apropiate log interval based on + # max message size + max_msg_size = 10 + rand_max + + if max_msg_size <= 32 * 1024: + log_interval = 10_000 + + else: + log_interval = 1000 + + for i in range(amount): + msg = f'[{i:08}]'.encode('utf-8') + + if rand_max > 0: + msg += os.urandom( + random.randint(rand_min, rand_max)) + + size += len(msg) + + msgs.append(msg) + + if ( + not silent + and + i > 0 + and + i % log_interval == 0 + ): + print(f'{i} generated') + + if not silent: + print(f'done, {size:,} bytes in total') + + return msgs, size diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index c6ca1d89..b95640dc 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -91,7 +91,7 @@ from tractor._state import ( if TYPE_CHECKING: from trio.lowlevel import Task from threading import Thread - from tractor._ipc import Channel + from tractor.ipc import Channel from tractor._runtime import ( Actor, ) diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py new file mode 100644 index 00000000..4f0cd2b4 --- /dev/null +++ b/tractor/ipc/__init__.py @@ -0,0 +1,61 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import platform + +from ._transport import MsgTransport as MsgTransport + +from ._tcp import ( + get_stream_addrs as get_stream_addrs, + MsgpackTCPStream as MsgpackTCPStream +) + +from ._chan import ( + _connect_chan as _connect_chan, + get_msg_transport as get_msg_transport, + Channel as Channel +) + +if platform.system() == 'Linux': + from ._linux import ( + EFD_SEMAPHORE as EFD_SEMAPHORE, + EFD_CLOEXEC as EFD_CLOEXEC, + EFD_NONBLOCK as EFD_NONBLOCK, + open_eventfd as open_eventfd, + write_eventfd as write_eventfd, + read_eventfd as read_eventfd, + close_eventfd as close_eventfd, + EFDReadCancelled as EFDReadCancelled, + EventFD as EventFD, + ) + + from ._ringbuf import ( + RBToken as RBToken, + open_ringbuf as open_ringbuf, + RingBuffSender as RingBuffSender, + RingBuffReceiver as RingBuffReceiver, + open_ringbuf_pair as open_ringbuf_pair, + attach_to_ringbuf_receiver as attach_to_ringbuf_receiver, + attach_to_ringbuf_sender as attach_to_ringbuf_sender, + attach_to_ringbuf_stream as attach_to_ringbuf_stream, + RingBuffBytesSender as RingBuffBytesSender, + RingBuffBytesReceiver as RingBuffBytesReceiver, + RingBuffChannel as RingBuffChannel, + attach_to_ringbuf_schannel as attach_to_ringbuf_schannel, + attach_to_ringbuf_rchannel as attach_to_ringbuf_rchannel, + attach_to_ringbuf_channel as attach_to_ringbuf_channel, + ) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py new file mode 100644 index 00000000..1b6ba29f --- /dev/null +++ b/tractor/ipc/_chan.py @@ -0,0 +1,404 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Inter-process comms abstractions + +""" +from __future__ import annotations +from collections.abc import AsyncGenerator +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) +import platform +from pprint import pformat +import typing +from typing import ( + Any, + Type +) + +import trio + +from tractor.ipc._transport import MsgTransport +from tractor.ipc._tcp import ( + MsgpackTCPStream, + get_stream_addrs +) +from tractor.log import get_logger +from tractor._exceptions import ( + MsgTypeError, + pack_from_raise, +) +from tractor.msg import MsgCodec + + +log = get_logger(__name__) + +_is_windows = platform.system() == 'Windows' + + +def get_msg_transport( + + key: tuple[str, str], + +) -> Type[MsgTransport]: + + return { + ('msgpack', 'tcp'): MsgpackTCPStream, + }[key] + + +class Channel: + ''' + An inter-process channel for communication between (remote) actors. + + Wraps a ``MsgStream``: transport + encoding IPC connection. + + Currently we only support ``trio.SocketStream`` for transport + (aka TCP) and the ``msgpack`` interchange format via the ``msgspec`` + codec libary. + + ''' + def __init__( + + self, + destaddr: tuple[str, int]|None, + + msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), + + # TODO: optional reconnection support? + # auto_reconnect: bool = False, + # on_reconnect: typing.Callable[..., typing.Awaitable] = None, + + ) -> None: + + # self._recon_seq = on_reconnect + # self._autorecon = auto_reconnect + + self._destaddr = destaddr + self._transport_key = msg_transport_type_key + + # Either created in ``.connect()`` or passed in by + # user in ``.from_stream()``. + self._stream: trio.SocketStream|None = None + self._transport: MsgTransport|None = None + + # set after handshake - always uid of far end + self.uid: tuple[str, str]|None = None + + self._aiter_msgs = self._iter_msgs() + self._exc: Exception|None = None # set if far end actor errors + self._closed: bool = False + + # flag set by ``Portal.cancel_actor()`` indicating remote + # (possibly peer) cancellation of the far end actor + # runtime. + self._cancel_called: bool = False + + @property + def msgstream(self) -> MsgTransport: + log.info( + '`Channel.msgstream` is an old name, use `._transport`' + ) + return self._transport + + @property + def transport(self) -> MsgTransport: + return self._transport + + @classmethod + def from_stream( + cls, + stream: trio.SocketStream, + **kwargs, + + ) -> Channel: + + src, dst = get_stream_addrs(stream) + chan = Channel( + destaddr=dst, + **kwargs, + ) + + # set immediately here from provided instance + chan._stream: trio.SocketStream = stream + chan.set_msg_transport(stream) + return chan + + def set_msg_transport( + self, + stream: trio.SocketStream, + type_key: tuple[str, str]|None = None, + + # XXX optionally provided codec pair for `msgspec`: + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + codec: MsgCodec|None = None, + + ) -> MsgTransport: + type_key = ( + type_key + or + self._transport_key + ) + # get transport type, then + self._transport = get_msg_transport( + type_key + # instantiate an instance of the msg-transport + )( + stream, + codec=codec, + ) + return self._transport + + @cm + def apply_codec( + self, + codec: MsgCodec, + + ) -> None: + ''' + Temporarily override the underlying IPC msg codec for + dynamic enforcement of messaging schema. + + ''' + orig: MsgCodec = self._transport.codec + try: + self._transport.codec = codec + yield + finally: + self._transport.codec = orig + + # TODO: do a .src/.dst: str for maddrs? + def __repr__(self) -> str: + if not self._transport: + return '' + + return repr( + self._transport.stream.socket._sock + ).replace( # type: ignore + "socket.socket", + "Channel", + ) + + @property + def laddr(self) -> tuple[str, int]|None: + return self._transport.laddr if self._transport else None + + @property + def raddr(self) -> tuple[str, int]|None: + return self._transport.raddr if self._transport else None + + async def connect( + self, + destaddr: tuple[Any, ...] | None = None, + **kwargs + + ) -> MsgTransport: + + if self.connected(): + raise RuntimeError("channel is already connected?") + + destaddr = destaddr or self._destaddr + assert isinstance(destaddr, tuple) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) + transport = self.set_msg_transport(stream) + + log.transport( + f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' + ) + return transport + + # TODO: something like, + # `pdbp.hideframe_on(errors=[MsgTypeError])` + # instead of the `try/except` hack we have rn.. + # seems like a pretty useful thing to have in general + # along with being able to filter certain stack frame(s / sets) + # possibly based on the current log-level? + async def send( + self, + payload: Any, + + hide_tb: bool = False, + + ) -> None: + ''' + Send a coded msg-blob over the transport. + + ''' + __tracebackhide__: bool = hide_tb + try: + log.transport( + '=> send IPC msg:\n\n' + f'{pformat(payload)}\n' + ) + # assert self._transport # but why typing? + await self._transport.send( + payload, + hide_tb=hide_tb, + ) + except BaseException as _err: + err = _err # bind for introspection + if not isinstance(_err, MsgTypeError): + # assert err + __tracebackhide__: bool = False + else: + assert err.cid + + raise + + async def recv(self) -> Any: + assert self._transport + return await self._transport.recv() + + # TODO: auto-reconnect features like 0mq/nanomsg? + # -[ ] implement it manually with nods to SC prot + # possibly on multiple transport backends? + # -> seems like that might be re-inventing scalability + # prots tho no? + # try: + # return await self._transport.recv() + # except trio.BrokenResourceError: + # if self._autorecon: + # await self._reconnect() + # return await self.recv() + # raise + + async def aclose(self) -> None: + + log.transport( + f'Closing channel to {self.uid} ' + f'{self.laddr} -> {self.raddr}' + ) + assert self._transport + await self._transport.stream.aclose() + self._closed = True + + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, *args): + await self.aclose(*args) + + def __aiter__(self): + return self._aiter_msgs + + # ?TODO? run any reconnection sequence? + # -[ ] prolly should be impl-ed as deco-API? + # + # async def _reconnect(self) -> None: + # """Handle connection failures by polling until a reconnect can be + # established. + # """ + # down = False + # while True: + # try: + # with trio.move_on_after(3) as cancel_scope: + # await self.connect() + # cancelled = cancel_scope.cancelled_caught + # if cancelled: + # log.transport( + # "Reconnect timed out after 3 seconds, retrying...") + # continue + # else: + # log.transport("Stream connection re-established!") + + # # on_recon = self._recon_seq + # # if on_recon: + # # await on_recon(self) + + # break + # except (OSError, ConnectionRefusedError): + # if not down: + # down = True + # log.transport( + # f"Connection to {self.raddr} went down, waiting" + # " for re-establishment") + # await trio.sleep(1) + + async def _iter_msgs( + self + ) -> AsyncGenerator[Any, None]: + ''' + Yield `MsgType` IPC msgs decoded and deliverd from + an underlying `MsgTransport` protocol. + + This is a streaming routine alo implemented as an async-gen + func (same a `MsgTransport._iter_pkts()`) gets allocated by + a `.__call__()` inside `.__init__()` where it is assigned to + the `._aiter_msgs` attr. + + ''' + assert self._transport + while True: + try: + async for msg in self._transport: + match msg: + # NOTE: if transport/interchange delivers + # a type error, we pack it with the far + # end peer `Actor.uid` and relay the + # `Error`-msg upward to the `._rpc` stack + # for normal RAE handling. + case MsgTypeError(): + yield pack_from_raise( + local_err=msg, + cid=msg.cid, + + # XXX we pack it here bc lower + # layers have no notion of an + # actor-id ;) + src_uid=self.uid, + ) + case _: + yield msg + + except trio.BrokenResourceError: + + # if not self._autorecon: + raise + + await self.aclose() + + # if self._autorecon: # attempt reconnect + # await self._reconnect() + # continue + + def connected(self) -> bool: + return self._transport.connected() if self._transport else False + + +@acm +async def _connect_chan( + host: str, + port: int + +) -> typing.AsyncGenerator[Channel, None]: + ''' + Create and connect a channel with disconnect on context manager + teardown. + + ''' + chan = Channel((host, port)) + await chan.connect() + yield chan + with trio.CancelScope(shield=True): + await chan.aclose() diff --git a/tractor/ipc/_linux.py b/tractor/ipc/_linux.py new file mode 100644 index 00000000..0c05260e --- /dev/null +++ b/tractor/ipc/_linux.py @@ -0,0 +1,187 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Linux specifics, for now we are only exposing EventFD + +''' +import os +import errno + +import cffi +import trio + +ffi = cffi.FFI() + +# Declare the C functions and types we plan to use. +# - eventfd: for creating the event file descriptor +# - write: for writing to the file descriptor +# - read: for reading from the file descriptor +# - close: for closing the file descriptor +ffi.cdef( + ''' + int eventfd(unsigned int initval, int flags); + + ssize_t write(int fd, const void *buf, size_t count); + ssize_t read(int fd, void *buf, size_t count); + + int close(int fd); + ''' +) + + +# Open the default dynamic library (essentially 'libc' in most cases) +C = ffi.dlopen(None) + + +# Constants from , if needed. +EFD_SEMAPHORE = 1 +EFD_CLOEXEC = 0o2000000 +EFD_NONBLOCK = 0o4000 + + +def open_eventfd(initval: int = 0, flags: int = 0) -> int: + ''' + Open an eventfd with the given initial value and flags. + Returns the file descriptor on success, otherwise raises OSError. + + ''' + fd = C.eventfd(initval, flags) + if fd < 0: + raise OSError(errno.errorcode[ffi.errno], 'eventfd failed') + return fd + + +def write_eventfd(fd: int, value: int) -> int: + ''' + Write a 64-bit integer (uint64_t) to the eventfd's counter. + + ''' + # Create a uint64_t* in C, store `value` + data_ptr = ffi.new('uint64_t *', value) + + # Call write(fd, data_ptr, 8) + # We expect to write exactly 8 bytes (sizeof(uint64_t)) + ret = C.write(fd, data_ptr, 8) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed') + return ret + + +def read_eventfd(fd: int) -> int: + ''' + Read a 64-bit integer (uint64_t) from the eventfd, returning the value. + Reading resets the counter to 0 (unless using EFD_SEMAPHORE). + + ''' + # Allocate an 8-byte buffer in C for reading + buf = ffi.new('char[]', 8) + + ret = C.read(fd, buf, 8) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed') + # Convert the 8 bytes we read into a Python integer + data_bytes = ffi.unpack(buf, 8) # returns a Python bytes object of length 8 + value = int.from_bytes(data_bytes, byteorder='little', signed=False) + return value + + +def close_eventfd(fd: int) -> int: + ''' + Close the eventfd. + + ''' + ret = C.close(fd) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'close failed') + + +class EFDReadCancelled(Exception): + ... + + +class EventFD: + ''' + Use a previously opened eventfd(2), meant to be used in + sub-actors after root actor opens the eventfds then passes + them through pass_fds + + ''' + + def __init__( + self, + fd: int, + omode: str + ): + self._fd: int = fd + self._omode: str = omode + self._fobj = None + self._cscope: trio.CancelScope | None = None + + @property + def fd(self) -> int | None: + return self._fd + + def write(self, value: int) -> int: + return write_eventfd(self._fd, value) + + async def read(self) -> int: + ''' + Async wrapper for `read_eventfd(self.fd)` + + `trio.to_thread.run_sync` is used, need to use a `trio.CancelScope` + in order to make it cancellable when `self.close()` is called. + + ''' + self._cscope = trio.CancelScope() + with self._cscope: + return await trio.to_thread.run_sync( + read_eventfd, self._fd, + abandon_on_cancel=True + ) + + if self._cscope.cancelled_caught: + raise EFDReadCancelled + + self._cscope = None + + def read_direct(self) -> int: + ''' + Direct call to `read_eventfd(self.fd)`, unless `eventfd` was + opened with `EFD_NONBLOCK` its gonna block the thread. + + ''' + return read_eventfd(self._fd) + + def open(self): + self._fobj = os.fdopen(self._fd, self._omode) + + def close(self): + if self._fobj: + try: + self._fobj.close() + + except OSError: + ... + + if self._cscope: + self._cscope.cancel() + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/tractor/ipc/_mp_bs.py b/tractor/ipc/_mp_bs.py new file mode 100644 index 00000000..e51aa9ae --- /dev/null +++ b/tractor/ipc/_mp_bs.py @@ -0,0 +1,45 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Utils to tame mp non-SC madeness + +''' +def disable_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + + ''' + from multiprocessing import resource_tracker as mantracker + + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py new file mode 100644 index 00000000..10975b7a --- /dev/null +++ b/tractor/ipc/_ringbuf.py @@ -0,0 +1,651 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +IPC Reliable RingBuffer implementation + +''' +from __future__ import annotations +import struct +from typing import ( + ContextManager, + AsyncContextManager +) +from contextlib import ( + contextmanager as cm, + asynccontextmanager as acm +) +from multiprocessing.shared_memory import SharedMemory + +import trio +from msgspec import ( + Struct, + to_builtins +) + +from ._linux import ( + open_eventfd, + EFDReadCancelled, + EventFD +) +from ._mp_bs import disable_mantracker +from tractor.log import get_logger +from tractor._exceptions import ( + InternalError +) + + +log = get_logger(__name__) + + +disable_mantracker() + +_DEFAULT_RB_SIZE = 10 * 1024 + + +class RBToken(Struct, frozen=True): + ''' + RingBuffer token contains necesary info to open the three + eventfds and the shared memory + + ''' + shm_name: str + + write_eventfd: int # used to signal writer ptr advance + wrap_eventfd: int # used to signal reader ready after wrap around + eof_eventfd: int # used to signal writer closed + + buf_size: int + + def as_msg(self): + return to_builtins(self) + + @classmethod + def from_msg(cls, msg: dict) -> RBToken: + if isinstance(msg, RBToken): + return msg + + return RBToken(**msg) + + @property + def fds(self) -> tuple[int, int, int]: + ''' + Useful for `pass_fds` params + + ''' + return ( + self.write_eventfd, + self.wrap_eventfd, + self.eof_eventfd + ) + + +@cm +def open_ringbuf( + shm_name: str, + buf_size: int = _DEFAULT_RB_SIZE, +) -> ContextManager[RBToken]: + ''' + Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to + be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver` + + ''' + shm = SharedMemory( + name=shm_name, + size=buf_size, + create=True + ) + try: + with ( + EventFD(open_eventfd(), 'r') as write_event, + EventFD(open_eventfd(), 'r') as wrap_event, + EventFD(open_eventfd(), 'r') as eof_event, + ): + token = RBToken( + shm_name=shm_name, + write_eventfd=write_event.fd, + wrap_eventfd=wrap_event.fd, + eof_eventfd=eof_event.fd, + buf_size=buf_size + ) + yield token + + finally: + shm.unlink() + + +Buffer = bytes | bytearray | memoryview + + +''' +IPC Reliable Ring Buffer + +`eventfd(2)` is used for wrap around sync, to signal writes to +the reader and end of stream. + +''' + + +class RingBuffSender(trio.abc.SendStream): + ''' + Ring Buffer sender side implementation + + Do not use directly! manage with `attach_to_ringbuf_sender` + after having opened a ringbuf context with `open_ringbuf`. + + ''' + def __init__( + self, + token: RBToken, + cleanup: bool = False + ): + self._token = RBToken.from_msg(token) + self._shm: SharedMemory | None = None + self._write_event = EventFD(self._token.write_eventfd, 'w') + self._wrap_event = EventFD(self._token.wrap_eventfd, 'r') + self._eof_event = EventFD(self._token.eof_eventfd, 'w') + self._ptr = 0 + + self._cleanup = cleanup + self._send_lock = trio.StrictFIFOLock() + + @property + def name(self) -> str: + if not self._shm: + raise ValueError('shared memory not initialized yet!') + return self._shm.name + + @property + def size(self) -> int: + return self._token.buf_size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def send_all(self, data: Buffer): + async with self._send_lock: + # while data is larger than the remaining buf + target_ptr = self.ptr + len(data) + while target_ptr > self.size: + # write all bytes that fit + remaining = self.size - self.ptr + self._shm.buf[self.ptr:] = data[:remaining] + # signal write and wait for reader wrap around + self._write_event.write(remaining) + await self._wrap_event.read() + + # wrap around and trim already written bytes + self._ptr = 0 + data = data[remaining:] + target_ptr = self._ptr + len(data) + + # remaining data fits on buffer + self._shm.buf[self.ptr:target_ptr] = data + self._write_event.write(len(data)) + self._ptr = target_ptr + + async def wait_send_all_might_not_block(self): + raise NotImplementedError + + def open(self): + self._shm = SharedMemory( + name=self._token.shm_name, + size=self._token.buf_size, + create=False + ) + self._write_event.open() + self._wrap_event.open() + self._eof_event.open() + + def close(self): + self._eof_event.write( + self._ptr if self._ptr > 0 else self.size + ) + + if self._cleanup: + self._write_event.close() + self._wrap_event.close() + self._eof_event.close() + self._shm.close() + + async def aclose(self): + async with self._send_lock: + self.close() + + async def __aenter__(self): + self.open() + return self + + +class RingBuffReceiver(trio.abc.ReceiveStream): + ''' + Ring Buffer receiver side implementation + + Do not use directly! manage with `attach_to_ringbuf_receiver` + after having opened a ringbuf context with `open_ringbuf`. + + ''' + def __init__( + self, + token: RBToken, + cleanup: bool = True, + ): + self._token = RBToken.from_msg(token) + self._shm: SharedMemory | None = None + self._write_event = EventFD(self._token.write_eventfd, 'w') + self._wrap_event = EventFD(self._token.wrap_eventfd, 'r') + self._eof_event = EventFD(self._token.eof_eventfd, 'r') + self._ptr: int = 0 + self._write_ptr: int = 0 + self._end_ptr: int = -1 + + self._cleanup: bool = cleanup + + @property + def name(self) -> str: + if not self._shm: + raise ValueError('shared memory not initialized yet!') + return self._shm.name + + @property + def size(self) -> int: + return self._token.buf_size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def _eof_monitor_task(self): + ''' + Long running EOF event monitor, automatically run in bg by + `attach_to_ringbuf_receiver` context manager, if EOF event + is set its value will be the end pointer (highest valid + index to be read from buf, after setting the `self._end_ptr` + we close the write event which should cancel any blocked + `self._write_event.read()`s on it. + + ''' + try: + self._end_ptr = await self._eof_event.read() + self._write_event.close() + + except EFDReadCancelled: + ... + + except trio.Cancelled: + ... + + async def receive_some(self, max_bytes: int | None = None) -> bytes: + ''' + Receive up to `max_bytes`, if no `max_bytes` is provided + a reasonable default is used. + + ''' + if max_bytes is None: + max_bytes: int = _DEFAULT_RB_SIZE + + if max_bytes < 1: + raise ValueError("max_bytes must be >= 1") + + # delta is remaining bytes we havent read + delta = self._write_ptr - self._ptr + if delta == 0: + # we have read all we can, see if new data is available + if self._end_ptr < 0: + # if we havent been signaled about EOF yet + try: + delta = await self._write_event.read() + self._write_ptr += delta + + except EFDReadCancelled: + # while waiting for new data `self._write_event` was closed + # this means writer signaled EOF + if self._end_ptr > 0: + # final self._write_ptr modification and recalculate delta + self._write_ptr = self._end_ptr + delta = self._end_ptr - self._ptr + + else: + # shouldnt happen cause self._eof_monitor_task always sets + # self._end_ptr before closing self._write_event + raise InternalError( + 'self._write_event.read cancelled but self._end_ptr is not set' + ) + + else: + # no more bytes to read and self._end_ptr set, EOF reached + return b'' + + # dont overflow caller + delta = min(delta, max_bytes) + + target_ptr = self._ptr + delta + + # fetch next segment and advance ptr + segment = bytes(self._shm.buf[self._ptr:target_ptr]) + self._ptr = target_ptr + + if self._ptr == self.size: + # reached the end, signal wrap around + self._ptr = 0 + self._write_ptr = 0 + self._wrap_event.write(1) + + return segment + + def open(self): + self._shm = SharedMemory( + name=self._token.shm_name, + size=self._token.buf_size, + create=False + ) + self._write_event.open() + self._wrap_event.open() + self._eof_event.open() + + def close(self): + if self._cleanup: + self._write_event.close() + self._wrap_event.close() + self._eof_event.close() + self._shm.close() + + async def aclose(self): + self.close() + + async def __aenter__(self): + self.open() + return self + + +@acm +async def attach_to_ringbuf_receiver( + token: RBToken, + cleanup: bool = True +) -> AsyncContextManager[RingBuffReceiver]: + ''' + Attach a RingBuffReceiver from a previously opened + RBToken. + + Launches `receiver._eof_monitor_task` in a `trio.Nursery`. + ''' + async with ( + trio.open_nursery() as n, + RingBuffReceiver( + token, + cleanup=cleanup + ) as receiver + ): + n.start_soon(receiver._eof_monitor_task) + yield receiver + + +@acm +async def attach_to_ringbuf_sender( + token: RBToken, + cleanup: bool = True +) -> AsyncContextManager[RingBuffSender]: + ''' + Attach a RingBuffSender from a previously opened + RBToken. + + ''' + async with RingBuffSender( + token, + cleanup=cleanup + ) as sender: + yield sender + + +@cm +def open_ringbuf_pair( + name: str, + buf_size: int = _DEFAULT_RB_SIZE +) -> ContextManager[tuple(RBToken, RBToken)]: + ''' + Handle resources for a ringbuf pair to be used for + bidirectional messaging. + + ''' + with ( + open_ringbuf( + name + '.pair0', + buf_size=buf_size + ) as token_0, + + open_ringbuf( + name + '.pair1', + buf_size=buf_size + ) as token_1 + ): + yield token_0, token_1 + + +@acm +async def attach_to_ringbuf_stream( + token_in: RBToken, + token_out: RBToken, + cleanup_in: bool = True, + cleanup_out: bool = True +) -> AsyncContextManager[trio.StapledStream]: + ''' + Attach a trio.StapledStream from a previously opened + ringbuf pair. + + ''' + async with ( + attach_to_ringbuf_receiver( + token_in, + cleanup=cleanup_in + ) as receiver, + attach_to_ringbuf_sender( + token_out, + cleanup=cleanup_out + ) as sender, + ): + yield trio.StapledStream(sender, receiver) + + + +class RingBuffBytesSender(trio.abc.SendChannel[bytes]): + ''' + In order to guarantee full messages are received, all bytes + sent by `RingBuffBytesSender` are preceded with a 4 byte header + which decodes into a uint32 indicating the actual size of the + next payload. + + Optional batch mode: + + If `batch_size` > 1 messages wont get sent immediately but will be + stored until `batch_size` messages are pending, then it will send + them all at once. + + `batch_size` can be changed dynamically but always call, `flush()` + right before. + + ''' + def __init__( + self, + sender: RingBuffSender, + batch_size: int = 1 + ): + self._sender = sender + self.batch_size = batch_size + self._batch_msg_len = 0 + self._batch: bytes = b'' + + async def flush(self) -> None: + await self._sender.send_all(self._batch) + self._batch = b'' + self._batch_msg_len = 0 + + async def send(self, value: bytes) -> None: + msg: bytes = struct.pack(" None: + await self._sender.aclose() + + +class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): + ''' + See `RingBuffBytesSender` docstring. + + A `tricycle.BufferedReceiveStream` is used for the + `receive_exactly` API. + ''' + def __init__( + self, + receiver: RingBuffReceiver + ): + self._receiver = receiver + + async def _receive_exactly(self, num_bytes: int) -> bytes: + ''' + Fetch bytes from receiver until we read exactly `num_bytes` + or end of stream is signaled. + + ''' + payload = b'' + while len(payload) < num_bytes: + remaining = num_bytes - len(payload) + + new_bytes = await self._receiver.receive_some( + max_bytes=remaining + ) + + if new_bytes == b'': + raise trio.EndOfChannel + + payload += new_bytes + + return payload + + async def receive(self) -> bytes: + header: bytes = await self._receive_exactly(4) + size: int + size, = struct.unpack(" None: + await self._receiver.aclose() + + +@acm +async def attach_to_ringbuf_rchannel( + token: RBToken, + cleanup: bool = True +) -> AsyncContextManager[RingBuffBytesReceiver]: + ''' + Attach a RingBuffBytesReceiver from a previously opened + RBToken. + ''' + async with attach_to_ringbuf_receiver( + token, cleanup=cleanup + ) as receiver: + yield RingBuffBytesReceiver(receiver) + + +@acm +async def attach_to_ringbuf_schannel( + token: RBToken, + cleanup: bool = True, + batch_size: int = 1, +) -> AsyncContextManager[RingBuffBytesSender]: + ''' + Attach a RingBuffBytesSender from a previously opened + RBToken. + ''' + async with attach_to_ringbuf_sender( + token, cleanup=cleanup + ) as sender: + yield RingBuffBytesSender(sender, batch_size=batch_size) + + +class RingBuffChannel(trio.abc.Channel[bytes]): + ''' + Combine `RingBuffBytesSender` and `RingBuffBytesReceiver` + in order to expose the bidirectional `trio.abc.Channel` API. + + ''' + def __init__( + self, + sender: RingBuffBytesSender, + receiver: RingBuffBytesReceiver + ): + self._sender = sender + self._receiver = receiver + + async def send(self, value: bytes): + await self._sender.send(value) + + async def receive(self) -> bytes: + return await self._receiver.receive() + + async def aclose(self): + await self._receiver.aclose() + await self._sender.aclose() + + +@acm +async def attach_to_ringbuf_channel( + token_in: RBToken, + token_out: RBToken, + cleanup_in: bool = True, + cleanup_out: bool = True +) -> AsyncContextManager[RingBuffChannel]: + ''' + Attach to an already opened ringbuf pair and return + a `RingBuffChannel`. + + ''' + async with ( + attach_to_ringbuf_rchannel( + token_in, + cleanup=cleanup_in + ) as receiver, + attach_to_ringbuf_schannel( + token_out, + cleanup=cleanup_out + ) as sender, + ): + yield RingBuffChannel(sender, receiver) diff --git a/tractor/_shm.py b/tractor/ipc/_shm.py similarity index 95% rename from tractor/_shm.py rename to tractor/ipc/_shm.py index f8295105..9868ac73 100644 --- a/tractor/_shm.py +++ b/tractor/ipc/_shm.py @@ -32,10 +32,14 @@ from multiprocessing.shared_memory import ( ShareableList, ) -from msgspec import Struct +from msgspec import ( + Struct, + to_builtins +) import tractor -from .log import get_logger +from tractor.ipc._mp_bs import disable_mantracker +from tractor.log import get_logger _USE_POSIX = getattr(shm, '_USE_POSIX', False) @@ -54,34 +58,6 @@ except ImportError: log = get_logger(__name__) -def disable_mantracker(): - ''' - Disable all ``multiprocessing``` "resource tracking" machinery since - it's an absolute multi-threaded mess of non-SC madness. - - ''' - from multiprocessing import resource_tracker as mantracker - - # Tell the "resource tracker" thing to fuck off. - class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass - - def unregister(self, name, rtype): - pass - - def ensure_running(self): - pass - - # "know your land and know your prey" - # https://www.dailymotion.com/video/x6ozzco - mantracker._resource_tracker = ManTracker() - mantracker.register = mantracker._resource_tracker.register - mantracker.ensure_running = mantracker._resource_tracker.ensure_running - mantracker.unregister = mantracker._resource_tracker.unregister - mantracker.getfd = mantracker._resource_tracker.getfd - - disable_mantracker() @@ -142,7 +118,7 @@ class NDToken(Struct, frozen=True): ).descr def as_msg(self): - return self.to_dict() + return to_builtins(self) @classmethod def from_msg(cls, msg: dict) -> NDToken: diff --git a/tractor/_ipc.py b/tractor/ipc/_tcp.py similarity index 52% rename from tractor/_ipc.py rename to tractor/ipc/_tcp.py index 83186147..3ce0b4ea 100644 --- a/tractor/_ipc.py +++ b/tractor/ipc/_tcp.py @@ -13,31 +13,19 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +''' +TCP implementation of tractor.ipc._transport.MsgTransport protocol -""" -Inter-process comms abstractions - -""" +''' from __future__ import annotations from collections.abc import ( AsyncGenerator, AsyncIterator, ) -from contextlib import ( - asynccontextmanager as acm, - contextmanager as cm, -) -import platform -from pprint import pformat import struct -import typing from typing import ( Any, Callable, - runtime_checkable, - Protocol, - Type, - TypeVar, ) import msgspec @@ -47,7 +35,6 @@ import trio from tractor.log import get_logger from tractor._exceptions import ( MsgTypeError, - pack_from_raise, TransportClosed, _mk_send_mte, _mk_recv_mte, @@ -59,11 +46,11 @@ from tractor.msg import ( types as msgtypes, pretty_struct, ) +from tractor.ipc import MsgTransport + log = get_logger(__name__) -_is_windows = platform.system() == 'Windows' - def get_stream_addrs( stream: trio.SocketStream @@ -85,56 +72,6 @@ def get_stream_addrs( ) -# from tractor.msg.types import MsgType -# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? -# => BLEH, except can't bc prots must inherit typevar or param-spec -# vars.. -MsgType = TypeVar('MsgType') - - -# TODO: break up this mod into a subpkg so we can start adding new -# backends and move this type stuff into a dedicated file.. Bo -# -@runtime_checkable -class MsgTransport(Protocol[MsgType]): -# -# ^-TODO-^ consider using a generic def and indexing with our -# eventual msg definition/types? -# - https://docs.python.org/3/library/typing.html#typing.Protocol - - stream: trio.SocketStream - drained: list[MsgType] - - def __init__(self, stream: trio.SocketStream) -> None: - ... - - # XXX: should this instead be called `.sendall()`? - async def send(self, msg: MsgType) -> None: - ... - - async def recv(self) -> MsgType: - ... - - def __aiter__(self) -> MsgType: - ... - - def connected(self) -> bool: - ... - - # defining this sync otherwise it causes a mypy error because it - # can't figure out it's a generator i guess?..? - def drain(self) -> AsyncIterator[dict]: - ... - - @property - def laddr(self) -> tuple[str, int]: - ... - - @property - def raddr(self) -> tuple[str, int]: - ... - - # TODO: typing oddity.. not sure why we have to inherit here, but it # seems to be an issue with `get_msg_transport()` returning # a `Type[Protocol]`; probably should make a `mypy` issue? @@ -255,8 +192,8 @@ class MsgpackTCPStream(MsgTransport): raise TransportClosed( message=( f'IPC transport already closed by peer\n' - f'x]> {type(trans_err)}\n' - f' |_{self}\n' + f'x)> {type(trans_err)}\n' + f' |_{self}\n' ), loglevel=loglevel, ) from trans_err @@ -273,8 +210,8 @@ class MsgpackTCPStream(MsgTransport): raise TransportClosed( message=( f'IPC transport already manually closed locally?\n' - f'x]> {type(closure_err)} \n' - f' |_{self}\n' + f'x)> {type(closure_err)} \n' + f' |_{self}\n' ), loglevel='error', raise_on_report=( @@ -289,8 +226,8 @@ class MsgpackTCPStream(MsgTransport): raise TransportClosed( message=( f'IPC transport already gracefully closed\n' - f']>\n' - f' |_{self}\n' + f')>\n' + f'|_{self}\n' ), loglevel='transport', # cause=??? # handy or no? @@ -466,355 +403,3 @@ class MsgpackTCPStream(MsgTransport): def connected(self) -> bool: return self.stream.socket.fileno() != -1 - - -def get_msg_transport( - - key: tuple[str, str], - -) -> Type[MsgTransport]: - - return { - ('msgpack', 'tcp'): MsgpackTCPStream, - }[key] - - -class Channel: - ''' - An inter-process channel for communication between (remote) actors. - - Wraps a ``MsgStream``: transport + encoding IPC connection. - - Currently we only support ``trio.SocketStream`` for transport - (aka TCP) and the ``msgpack`` interchange format via the ``msgspec`` - codec libary. - - ''' - def __init__( - - self, - destaddr: tuple[str, int]|None, - - msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), - - # TODO: optional reconnection support? - # auto_reconnect: bool = False, - # on_reconnect: typing.Callable[..., typing.Awaitable] = None, - - ) -> None: - - # self._recon_seq = on_reconnect - # self._autorecon = auto_reconnect - - self._destaddr = destaddr - self._transport_key = msg_transport_type_key - - # Either created in ``.connect()`` or passed in by - # user in ``.from_stream()``. - self._stream: trio.SocketStream|None = None - self._transport: MsgTransport|None = None - - # set after handshake - always uid of far end - self.uid: tuple[str, str]|None = None - - self._aiter_msgs = self._iter_msgs() - self._exc: Exception|None = None # set if far end actor errors - self._closed: bool = False - - # flag set by ``Portal.cancel_actor()`` indicating remote - # (possibly peer) cancellation of the far end actor - # runtime. - self._cancel_called: bool = False - - @property - def msgstream(self) -> MsgTransport: - log.info( - '`Channel.msgstream` is an old name, use `._transport`' - ) - return self._transport - - @property - def transport(self) -> MsgTransport: - return self._transport - - @classmethod - def from_stream( - cls, - stream: trio.SocketStream, - **kwargs, - - ) -> Channel: - - src, dst = get_stream_addrs(stream) - chan = Channel( - destaddr=dst, - **kwargs, - ) - - # set immediately here from provided instance - chan._stream: trio.SocketStream = stream - chan.set_msg_transport(stream) - return chan - - def set_msg_transport( - self, - stream: trio.SocketStream, - type_key: tuple[str, str]|None = None, - - # XXX optionally provided codec pair for `msgspec`: - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - codec: MsgCodec|None = None, - - ) -> MsgTransport: - type_key = ( - type_key - or - self._transport_key - ) - # get transport type, then - self._transport = get_msg_transport( - type_key - # instantiate an instance of the msg-transport - )( - stream, - codec=codec, - ) - return self._transport - - @cm - def apply_codec( - self, - codec: MsgCodec, - - ) -> None: - ''' - Temporarily override the underlying IPC msg codec for - dynamic enforcement of messaging schema. - - ''' - orig: MsgCodec = self._transport.codec - try: - self._transport.codec = codec - yield - finally: - self._transport.codec = orig - - # TODO: do a .src/.dst: str for maddrs? - def __repr__(self) -> str: - if not self._transport: - return '' - - return repr( - self._transport.stream.socket._sock - ).replace( # type: ignore - "socket.socket", - "Channel", - ) - - @property - def laddr(self) -> tuple[str, int]|None: - return self._transport.laddr if self._transport else None - - @property - def raddr(self) -> tuple[str, int]|None: - return self._transport.raddr if self._transport else None - - async def connect( - self, - destaddr: tuple[Any, ...] | None = None, - **kwargs - - ) -> MsgTransport: - - if self.connected(): - raise RuntimeError("channel is already connected?") - - destaddr = destaddr or self._destaddr - assert isinstance(destaddr, tuple) - - stream = await trio.open_tcp_stream( - *destaddr, - **kwargs - ) - transport = self.set_msg_transport(stream) - - log.transport( - f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' - ) - return transport - - # TODO: something like, - # `pdbp.hideframe_on(errors=[MsgTypeError])` - # instead of the `try/except` hack we have rn.. - # seems like a pretty useful thing to have in general - # along with being able to filter certain stack frame(s / sets) - # possibly based on the current log-level? - async def send( - self, - payload: Any, - - hide_tb: bool = False, - - ) -> None: - ''' - Send a coded msg-blob over the transport. - - ''' - __tracebackhide__: bool = hide_tb - try: - log.transport( - '=> send IPC msg:\n\n' - f'{pformat(payload)}\n' - ) - # assert self._transport # but why typing? - await self._transport.send( - payload, - hide_tb=hide_tb, - ) - except BaseException as _err: - err = _err # bind for introspection - if not isinstance(_err, MsgTypeError): - # assert err - __tracebackhide__: bool = False - else: - assert err.cid - - raise - - async def recv(self) -> Any: - assert self._transport - return await self._transport.recv() - - # TODO: auto-reconnect features like 0mq/nanomsg? - # -[ ] implement it manually with nods to SC prot - # possibly on multiple transport backends? - # -> seems like that might be re-inventing scalability - # prots tho no? - # try: - # return await self._transport.recv() - # except trio.BrokenResourceError: - # if self._autorecon: - # await self._reconnect() - # return await self.recv() - # raise - - async def aclose(self) -> None: - - log.transport( - f'Closing channel to {self.uid} ' - f'{self.laddr} -> {self.raddr}' - ) - assert self._transport - await self._transport.stream.aclose() - self._closed = True - - async def __aenter__(self): - await self.connect() - return self - - async def __aexit__(self, *args): - await self.aclose(*args) - - def __aiter__(self): - return self._aiter_msgs - - # ?TODO? run any reconnection sequence? - # -[ ] prolly should be impl-ed as deco-API? - # - # async def _reconnect(self) -> None: - # """Handle connection failures by polling until a reconnect can be - # established. - # """ - # down = False - # while True: - # try: - # with trio.move_on_after(3) as cancel_scope: - # await self.connect() - # cancelled = cancel_scope.cancelled_caught - # if cancelled: - # log.transport( - # "Reconnect timed out after 3 seconds, retrying...") - # continue - # else: - # log.transport("Stream connection re-established!") - - # # on_recon = self._recon_seq - # # if on_recon: - # # await on_recon(self) - - # break - # except (OSError, ConnectionRefusedError): - # if not down: - # down = True - # log.transport( - # f"Connection to {self.raddr} went down, waiting" - # " for re-establishment") - # await trio.sleep(1) - - async def _iter_msgs( - self - ) -> AsyncGenerator[Any, None]: - ''' - Yield `MsgType` IPC msgs decoded and deliverd from - an underlying `MsgTransport` protocol. - - This is a streaming routine alo implemented as an async-gen - func (same a `MsgTransport._iter_pkts()`) gets allocated by - a `.__call__()` inside `.__init__()` where it is assigned to - the `._aiter_msgs` attr. - - ''' - assert self._transport - while True: - try: - async for msg in self._transport: - match msg: - # NOTE: if transport/interchange delivers - # a type error, we pack it with the far - # end peer `Actor.uid` and relay the - # `Error`-msg upward to the `._rpc` stack - # for normal RAE handling. - case MsgTypeError(): - yield pack_from_raise( - local_err=msg, - cid=msg.cid, - - # XXX we pack it here bc lower - # layers have no notion of an - # actor-id ;) - src_uid=self.uid, - ) - case _: - yield msg - - except trio.BrokenResourceError: - - # if not self._autorecon: - raise - - await self.aclose() - - # if self._autorecon: # attempt reconnect - # await self._reconnect() - # continue - - def connected(self) -> bool: - return self._transport.connected() if self._transport else False - - -@acm -async def _connect_chan( - host: str, - port: int - -) -> typing.AsyncGenerator[Channel, None]: - ''' - Create and connect a channel with disconnect on context manager - teardown. - - ''' - chan = Channel((host, port)) - await chan.connect() - yield chan - with trio.CancelScope(shield=True): - await chan.aclose() diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py new file mode 100644 index 00000000..64453c89 --- /dev/null +++ b/tractor/ipc/_transport.py @@ -0,0 +1,74 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +typing.Protocol based generic msg API, implement this class to add backends for +tractor.ipc.Channel + +''' +import trio +from typing import ( + runtime_checkable, + Protocol, + TypeVar, +) +from collections.abc import AsyncIterator + + +# from tractor.msg.types import MsgType +# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? +# => BLEH, except can't bc prots must inherit typevar or param-spec +# vars.. +MsgType = TypeVar('MsgType') + + +@runtime_checkable +class MsgTransport(Protocol[MsgType]): +# +# ^-TODO-^ consider using a generic def and indexing with our +# eventual msg definition/types? +# - https://docs.python.org/3/library/typing.html#typing.Protocol + + stream: trio.abc.Stream + drained: list[MsgType] + + def __init__(self, stream: trio.abc.Stream) -> None: + ... + + # XXX: should this instead be called `.sendall()`? + async def send(self, msg: MsgType) -> None: + ... + + async def recv(self) -> MsgType: + ... + + def __aiter__(self) -> MsgType: + ... + + def connected(self) -> bool: + ... + + # defining this sync otherwise it causes a mypy error because it + # can't figure out it's a generator i guess?..? + def drain(self) -> AsyncIterator[dict]: + ... + + @property + def laddr(self) -> tuple[str, int]: + ... + + @property + def raddr(self) -> tuple[str, int]: + ... diff --git a/tractor/log.py b/tractor/log.py index 74e0321b..48b5cbd4 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -92,7 +92,7 @@ class StackLevelAdapter(LoggerAdapter): ) -> None: ''' IPC transport level msg IO; generally anything below - `._ipc.Channel` and friends. + `.ipc.Channel` and friends. ''' return self.log(5, msg) @@ -285,7 +285,7 @@ def get_logger( # NOTE: for handling for modules that use ``get_logger(__name__)`` # we make the following stylistic choice: # - always avoid duplicate project-package token - # in msg output: i.e. tractor.tractor _ipc.py in header + # in msg output: i.e. tractor.tractor.ipc._chan.py in header # looks ridiculous XD # - never show the leaf module name in the {name} part # since in python the {filename} is always this same diff --git a/uv.lock b/uv.lock index e1c409f5..76b22243 100644 --- a/uv.lock +++ b/uv.lock @@ -20,10 +20,38 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/fc/97/c783634659c2920c3fc70419e3af40972dbaf758daa229a7d6ea6135c90d/cffi-1.17.1.tar.gz", hash = "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824", size = 516621 } wheels = [ + { url = "https://files.pythonhosted.org/packages/6b/f4/927e3a8899e52a27fa57a48607ff7dc91a9ebe97399b357b85a0c7892e00/cffi-1.17.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a45e3c6913c5b87b3ff120dcdc03f6131fa0065027d0ed7ee6190736a74cd401", size = 182264 }, + { url = "https://files.pythonhosted.org/packages/6c/f5/6c3a8efe5f503175aaddcbea6ad0d2c96dad6f5abb205750d1b3df44ef29/cffi-1.17.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:30c5e0cb5ae493c04c8b42916e52ca38079f1b235c2f8ae5f4527b963c401caf", size = 178651 }, + { url = "https://files.pythonhosted.org/packages/94/dd/a3f0118e688d1b1a57553da23b16bdade96d2f9bcda4d32e7d2838047ff7/cffi-1.17.1-cp311-cp311-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f75c7ab1f9e4aca5414ed4d8e5c0e303a34f4421f8a0d47a4d019ceff0ab6af4", size = 445259 }, + { url = "https://files.pythonhosted.org/packages/2e/ea/70ce63780f096e16ce8588efe039d3c4f91deb1dc01e9c73a287939c79a6/cffi-1.17.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1ed2dd2972641495a3ec98445e09766f077aee98a1c896dcb4ad0d303628e41", size = 469200 }, + { url = "https://files.pythonhosted.org/packages/1c/a0/a4fa9f4f781bda074c3ddd57a572b060fa0df7655d2a4247bbe277200146/cffi-1.17.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:46bf43160c1a35f7ec506d254e5c890f3c03648a4dbac12d624e4490a7046cd1", size = 477235 }, + { url = "https://files.pythonhosted.org/packages/62/12/ce8710b5b8affbcdd5c6e367217c242524ad17a02fe5beec3ee339f69f85/cffi-1.17.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a24ed04c8ffd54b0729c07cee15a81d964e6fee0e3d4d342a27b020d22959dc6", size = 459721 }, + { url = "https://files.pythonhosted.org/packages/ff/6b/d45873c5e0242196f042d555526f92aa9e0c32355a1be1ff8c27f077fd37/cffi-1.17.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:610faea79c43e44c71e1ec53a554553fa22321b65fae24889706c0a84d4ad86d", size = 467242 }, + { url = "https://files.pythonhosted.org/packages/1a/52/d9a0e523a572fbccf2955f5abe883cfa8bcc570d7faeee06336fbd50c9fc/cffi-1.17.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:a9b15d491f3ad5d692e11f6b71f7857e7835eb677955c00cc0aefcd0669adaf6", size = 477999 }, + { url = "https://files.pythonhosted.org/packages/44/74/f2a2460684a1a2d00ca799ad880d54652841a780c4c97b87754f660c7603/cffi-1.17.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:de2ea4b5833625383e464549fec1bc395c1bdeeb5f25c4a3a82b5a8c756ec22f", size = 454242 }, + { url = "https://files.pythonhosted.org/packages/f8/4a/34599cac7dfcd888ff54e801afe06a19c17787dfd94495ab0c8d35fe99fb/cffi-1.17.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:fc48c783f9c87e60831201f2cce7f3b2e4846bf4d8728eabe54d60700b318a0b", size = 478604 }, { url = "https://files.pythonhosted.org/packages/34/33/e1b8a1ba29025adbdcda5fb3a36f94c03d771c1b7b12f726ff7fef2ebe36/cffi-1.17.1-cp311-cp311-win32.whl", hash = "sha256:85a950a4ac9c359340d5963966e3e0a94a676bd6245a4b55bc43949eee26a655", size = 171727 }, { url = "https://files.pythonhosted.org/packages/3d/97/50228be003bb2802627d28ec0627837ac0bf35c90cf769812056f235b2d1/cffi-1.17.1-cp311-cp311-win_amd64.whl", hash = "sha256:caaf0640ef5f5517f49bc275eca1406b0ffa6aa184892812030f04c2abf589a0", size = 181400 }, + { url = "https://files.pythonhosted.org/packages/5a/84/e94227139ee5fb4d600a7a4927f322e1d4aea6fdc50bd3fca8493caba23f/cffi-1.17.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:805b4371bf7197c329fcb3ead37e710d1bca9da5d583f5073b799d5c5bd1eee4", size = 183178 }, + { url = "https://files.pythonhosted.org/packages/da/ee/fb72c2b48656111c4ef27f0f91da355e130a923473bf5ee75c5643d00cca/cffi-1.17.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:733e99bc2df47476e3848417c5a4540522f234dfd4ef3ab7fafdf555b082ec0c", size = 178840 }, + { url = "https://files.pythonhosted.org/packages/cc/b6/db007700f67d151abadf508cbfd6a1884f57eab90b1bb985c4c8c02b0f28/cffi-1.17.1-cp312-cp312-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1257bdabf294dceb59f5e70c64a3e2f462c30c7ad68092d01bbbfb1c16b1ba36", size = 454803 }, + { url = "https://files.pythonhosted.org/packages/1a/df/f8d151540d8c200eb1c6fba8cd0dfd40904f1b0682ea705c36e6c2e97ab3/cffi-1.17.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da95af8214998d77a98cc14e3a3bd00aa191526343078b530ceb0bd710fb48a5", size = 478850 }, + { url = "https://files.pythonhosted.org/packages/28/c0/b31116332a547fd2677ae5b78a2ef662dfc8023d67f41b2a83f7c2aa78b1/cffi-1.17.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d63afe322132c194cf832bfec0dc69a99fb9bb6bbd550f161a49e9e855cc78ff", size = 485729 }, + { url = "https://files.pythonhosted.org/packages/91/2b/9a1ddfa5c7f13cab007a2c9cc295b70fbbda7cb10a286aa6810338e60ea1/cffi-1.17.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f79fc4fc25f1c8698ff97788206bb3c2598949bfe0fef03d299eb1b5356ada99", size = 471256 }, + { url = "https://files.pythonhosted.org/packages/b2/d5/da47df7004cb17e4955df6a43d14b3b4ae77737dff8bf7f8f333196717bf/cffi-1.17.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b62ce867176a75d03a665bad002af8e6d54644fad99a3c70905c543130e39d93", size = 479424 }, + { url = "https://files.pythonhosted.org/packages/0b/ac/2a28bcf513e93a219c8a4e8e125534f4f6db03e3179ba1c45e949b76212c/cffi-1.17.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:386c8bf53c502fff58903061338ce4f4950cbdcb23e2902d86c0f722b786bbe3", size = 484568 }, + { url = "https://files.pythonhosted.org/packages/d4/38/ca8a4f639065f14ae0f1d9751e70447a261f1a30fa7547a828ae08142465/cffi-1.17.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:4ceb10419a9adf4460ea14cfd6bc43d08701f0835e979bf821052f1805850fe8", size = 488736 }, { url = "https://files.pythonhosted.org/packages/86/c5/28b2d6f799ec0bdecf44dced2ec5ed43e0eb63097b0f58c293583b406582/cffi-1.17.1-cp312-cp312-win32.whl", hash = "sha256:a08d7e755f8ed21095a310a693525137cfe756ce62d066e53f502a83dc550f65", size = 172448 }, { url = "https://files.pythonhosted.org/packages/50/b9/db34c4755a7bd1cb2d1603ac3863f22bcecbd1ba29e5ee841a4bc510b294/cffi-1.17.1-cp312-cp312-win_amd64.whl", hash = "sha256:51392eae71afec0d0c8fb1a53b204dbb3bcabcb3c9b807eedf3e1e6ccf2de903", size = 181976 }, + { url = "https://files.pythonhosted.org/packages/8d/f8/dd6c246b148639254dad4d6803eb6a54e8c85c6e11ec9df2cffa87571dbe/cffi-1.17.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f3a2b4222ce6b60e2e8b337bb9596923045681d71e5a082783484d845390938e", size = 182989 }, + { url = "https://files.pythonhosted.org/packages/8b/f1/672d303ddf17c24fc83afd712316fda78dc6fce1cd53011b839483e1ecc8/cffi-1.17.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0984a4925a435b1da406122d4d7968dd861c1385afe3b45ba82b750f229811e2", size = 178802 }, + { url = "https://files.pythonhosted.org/packages/0e/2d/eab2e858a91fdff70533cab61dcff4a1f55ec60425832ddfdc9cd36bc8af/cffi-1.17.1-cp313-cp313-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d01b12eeeb4427d3110de311e1774046ad344f5b1a7403101878976ecd7a10f3", size = 454792 }, + { url = "https://files.pythonhosted.org/packages/75/b2/fbaec7c4455c604e29388d55599b99ebcc250a60050610fadde58932b7ee/cffi-1.17.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:706510fe141c86a69c8ddc029c7910003a17353970cff3b904ff0686a5927683", size = 478893 }, + { url = "https://files.pythonhosted.org/packages/4f/b7/6e4a2162178bf1935c336d4da8a9352cccab4d3a5d7914065490f08c0690/cffi-1.17.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:de55b766c7aa2e2a3092c51e0483d700341182f08e67c63630d5b6f200bb28e5", size = 485810 }, + { url = "https://files.pythonhosted.org/packages/c7/8a/1d0e4a9c26e54746dc08c2c6c037889124d4f59dffd853a659fa545f1b40/cffi-1.17.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c59d6e989d07460165cc5ad3c61f9fd8f1b4796eacbd81cee78957842b834af4", size = 471200 }, + { url = "https://files.pythonhosted.org/packages/26/9f/1aab65a6c0db35f43c4d1b4f580e8df53914310afc10ae0397d29d697af4/cffi-1.17.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dd398dbc6773384a17fe0d3e7eeb8d1a21c2200473ee6806bb5e6a8e62bb73dd", size = 479447 }, + { url = "https://files.pythonhosted.org/packages/5f/e4/fb8b3dd8dc0e98edf1135ff067ae070bb32ef9d509d6cb0f538cd6f7483f/cffi-1.17.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:3edc8d958eb099c634dace3c7e16560ae474aa3803a5df240542b305d14e14ed", size = 484358 }, + { url = "https://files.pythonhosted.org/packages/f1/47/d7145bf2dc04684935d57d67dff9d6d795b2ba2796806bb109864be3a151/cffi-1.17.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:72e72408cad3d5419375fc87d289076ee319835bdfa2caad331e377589aebba9", size = 488469 }, { url = "https://files.pythonhosted.org/packages/bf/ee/f94057fa6426481d663b88637a9a10e859e492c73d0384514a17d78ee205/cffi-1.17.1-cp313-cp313-win32.whl", hash = "sha256:e03eab0a8677fa80d646b5ddece1cbeaf556c313dcfac435ba11f107ba117b5d", size = 172475 }, { url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009 }, ] @@ -321,6 +349,7 @@ name = "tractor" version = "0.1.0a6.dev0" source = { editable = "." } dependencies = [ + { name = "cffi" }, { name = "colorlog" }, { name = "msgspec" }, { name = "pdbp" }, @@ -342,6 +371,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "cffi", specifier = ">=1.17.1" }, { name = "colorlog", specifier = ">=6.8.2,<7" }, { name = "msgspec", specifier = ">=0.19.0" }, { name = "pdbp", specifier = ">=1.6,<2" },