forked from goodboy/tractor
commit
21e60554cc
|
@ -3,6 +3,7 @@ name: CI
|
|||
on: push
|
||||
|
||||
jobs:
|
||||
|
||||
mypy:
|
||||
name: 'MyPy'
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -23,23 +24,59 @@ jobs:
|
|||
run: mypy tractor/ --ignore-missing-imports
|
||||
|
||||
testing:
|
||||
|
||||
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
|
||||
timeout-minutes: 9
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
python: ['3.8', '3.9']
|
||||
spawn_backend: ['trio', 'mp']
|
||||
|
||||
steps:
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '${{ matrix.python }}'
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||
|
||||
- name: Run tests
|
||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
||||
|
||||
testing-msgspec:
|
||||
# runs py3.9 jobs on all OS's but with optional `msgspec` dep installed
|
||||
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec'
|
||||
timeout-minutes: 10
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
python: ['3.9']
|
||||
spawn_backend: ['trio', 'mp']
|
||||
|
||||
steps:
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '${{ matrix.python }}'
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||
|
||||
- name: Run tests
|
||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
||||
|
|
|
@ -24,8 +24,9 @@ Features
|
|||
- Builtin IPC streaming APIs with task fan-out broadcasting
|
||||
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
|
||||
- Support for a swappable, OS specific, process spawning layer
|
||||
- A modular transport stack, allowing for custom serialization,
|
||||
communications protocols, and environment specific IPC primitives
|
||||
- A modular transport stack, allowing for custom serialization (eg.
|
||||
`msgspec`_), communications protocols, and environment specific IPC
|
||||
primitives
|
||||
- `structured concurrency`_ from the ground up
|
||||
|
||||
|
||||
|
@ -322,6 +323,12 @@ From PyPi::
|
|||
pip install tractor
|
||||
|
||||
|
||||
To try out the (optionally) faster `msgspec`_ codec instead of the
|
||||
default ``msgpack`` lib::
|
||||
|
||||
pip install tractor[msgspec]
|
||||
|
||||
|
||||
From git::
|
||||
|
||||
pip install git+git://github.com/goodboy/tractor.git
|
||||
|
@ -394,7 +401,8 @@ Help us push toward the future.
|
|||
|
||||
- (Soon to land) ``asyncio`` support allowing for "infected" actors where
|
||||
`trio` drives the `asyncio` scheduler via the astounding "`guest mode`_"
|
||||
- Typed messaging protocols (ex. via ``msgspec``)
|
||||
- Typed messaging protocols (ex. via ``msgspec``, see `#36
|
||||
<https://github.com/goodboy/tractor/issues/36>`_)
|
||||
- Erlang-style supervisors via composed context managers
|
||||
|
||||
|
||||
|
@ -427,6 +435,7 @@ channel`_!
|
|||
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
||||
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
||||
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
||||
.. _msgspec: https://jcristharif.com/msgspec/
|
||||
|
||||
|
||||
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
Add optional `msgspec <https://jcristharif.com/msgspec/>`_ support over
|
||||
TCP streams as an alernative, faster MessagePack codec.
|
||||
|
||||
This get's us moving toward typed messaging/IPC protocols. Further,
|
||||
``msgspec`` structs may be a valid tool to start for formalizing our "SC
|
||||
dialog un-protocol" messages as described in `#36
|
||||
<https://github.com/goodboy/tractor/issues/36>`_`.
|
||||
|
||||
|
10
setup.py
10
setup.py
|
@ -44,6 +44,10 @@ setup(
|
|||
'async_generator',
|
||||
'trio_typing',
|
||||
|
||||
# tooling
|
||||
'tricycle',
|
||||
'trio_typing',
|
||||
|
||||
# tooling
|
||||
'colorlog',
|
||||
'wrapt',
|
||||
|
@ -53,6 +57,12 @@ setup(
|
|||
'msgpack',
|
||||
|
||||
],
|
||||
extras_require={
|
||||
|
||||
# serialization
|
||||
'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"],
|
||||
|
||||
},
|
||||
tests_require=['pytest'],
|
||||
python_requires=">=3.8",
|
||||
keywords=[
|
||||
|
|
|
@ -42,7 +42,7 @@ async def test_reg_then_unreg(arb_addr):
|
|||
|
||||
await trio.sleep(0.1)
|
||||
assert uid not in aportal.actor._registry
|
||||
sockaddrs = actor._registry[uid]
|
||||
sockaddrs = actor._registry.get(uid)
|
||||
assert not sockaddrs
|
||||
|
||||
|
||||
|
@ -136,7 +136,7 @@ async def spawn_and_check_registry(
|
|||
if actor.is_arbiter:
|
||||
|
||||
async def get_reg():
|
||||
return actor._registry
|
||||
return await actor.get_registry()
|
||||
|
||||
extra = 1 # arbiter is local root actor
|
||||
else:
|
||||
|
@ -187,7 +187,6 @@ async def spawn_and_check_registry(
|
|||
await cancel(use_signal)
|
||||
|
||||
finally:
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.5)
|
||||
|
||||
# all subactors should have de-registered
|
||||
|
@ -277,7 +276,9 @@ async def close_chans_before_nursery(
|
|||
|
||||
# TODO: compact this back as was in last commit once
|
||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
||||
async with portal1.open_stream_from(stream_forever) as agen1:
|
||||
async with portal1.open_stream_from(
|
||||
stream_forever
|
||||
) as agen1:
|
||||
async with portal2.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2:
|
||||
|
@ -293,8 +294,9 @@ async def close_chans_before_nursery(
|
|||
# reliably triggered by an external SIGINT.
|
||||
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
||||
|
||||
# XXX: THIS IS THE KEY THING that happens
|
||||
# **before** exiting the actor nursery block
|
||||
# XXX: THIS IS THE KEY THING that
|
||||
# happens **before** exiting the
|
||||
# actor nursery block
|
||||
|
||||
# also kill off channels cuz why not
|
||||
await agen1.aclose()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""
|
||||
Spawning basics
|
||||
"""
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
@ -11,7 +12,11 @@ from conftest import tractor_test
|
|||
data_to_pass_down = {'doggy': 10, 'kitty': 4}
|
||||
|
||||
|
||||
async def spawn(is_arbiter, data, arb_addr):
|
||||
async def spawn(
|
||||
is_arbiter: bool,
|
||||
data: Dict,
|
||||
arb_addr: Tuple[str, int],
|
||||
):
|
||||
namespaces = [__name__]
|
||||
|
||||
await trio.sleep(0.1)
|
||||
|
|
|
@ -317,7 +317,8 @@ class Actor:
|
|||
# TODO: consider making this a dynamically defined
|
||||
# @dataclass once we get py3.7
|
||||
self.loglevel = loglevel
|
||||
self._arb_addr = arbiter_addr
|
||||
|
||||
self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
@ -421,7 +422,7 @@ class Actor:
|
|||
"""
|
||||
self._no_more_peers = trio.Event() # unset
|
||||
|
||||
chan = Channel(stream=stream)
|
||||
chan = Channel.from_stream(stream)
|
||||
log.runtime(f"New connection to us {chan}")
|
||||
|
||||
# send/receive initial handshake response
|
||||
|
@ -429,7 +430,10 @@ class Actor:
|
|||
uid = await self._do_handshake(chan)
|
||||
|
||||
except (
|
||||
# we need this for ``msgspec`` for some reason?
|
||||
# for now, it's been put in the stream backend.
|
||||
# trio.BrokenResourceError,
|
||||
|
||||
# trio.ClosedResourceError,
|
||||
TransportClosed,
|
||||
):
|
||||
|
@ -615,6 +619,7 @@ class Actor:
|
|||
# ``scope = Nursery.start()``
|
||||
task_status.started(loop_cs)
|
||||
async for msg in chan:
|
||||
|
||||
if msg is None: # loop terminate sentinel
|
||||
|
||||
log.debug(
|
||||
|
@ -775,6 +780,7 @@ class Actor:
|
|||
|
||||
if self._spawn_method == "trio":
|
||||
# Receive runtime state from our parent
|
||||
parent_data: dict[str, Any]
|
||||
parent_data = await chan.recv()
|
||||
log.debug(
|
||||
"Received state from parent:\n"
|
||||
|
@ -790,6 +796,15 @@ class Actor:
|
|||
_state._runtime_vars.update(rvs)
|
||||
|
||||
for attr, value in parent_data.items():
|
||||
|
||||
if attr == '_arb_addr':
|
||||
# XXX: ``msgspec`` doesn't support serializing tuples
|
||||
# so just cash manually here since it's what our
|
||||
# internals expect.
|
||||
value = tuple(value) if value else None
|
||||
self._arb_addr = value
|
||||
|
||||
else:
|
||||
setattr(self, attr, value)
|
||||
|
||||
return chan, accept_addr
|
||||
|
@ -804,21 +819,25 @@ class Actor:
|
|||
async def _async_main(
|
||||
self,
|
||||
accept_addr: Optional[Tuple[str, int]] = None,
|
||||
|
||||
# XXX: currently ``parent_addr`` is only needed for the
|
||||
# ``multiprocessing`` backend (which pickles state sent to
|
||||
# the child instead of relaying it over the connect-back
|
||||
# channel). Once that backend is removed we can likely just
|
||||
# change this so a simple ``is_subactor: bool`` which will
|
||||
# change this to a simple ``is_subactor: bool`` which will
|
||||
# be False when running as root actor and True when as
|
||||
# a subactor.
|
||||
parent_addr: Optional[Tuple[str, int]] = None,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
"""Start the channel server, maybe connect back to the parent, and
|
||||
"""
|
||||
Start the channel server, maybe connect back to the parent, and
|
||||
start the main task.
|
||||
|
||||
A "root-most" (or "top-level") nursery for this actor is opened here
|
||||
and when cancelled effectively cancels the actor.
|
||||
|
||||
"""
|
||||
registered_with_arbiter = False
|
||||
try:
|
||||
|
@ -1162,6 +1181,7 @@ class Actor:
|
|||
async def _do_handshake(
|
||||
self,
|
||||
chan: Channel
|
||||
|
||||
) -> Tuple[str, str]:
|
||||
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
||||
|
||||
|
@ -1169,12 +1189,13 @@ class Actor:
|
|||
parlance.
|
||||
"""
|
||||
await chan.send(self.uid)
|
||||
uid: Tuple[str, str] = await chan.recv()
|
||||
value = await chan.recv()
|
||||
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
|
||||
|
||||
if not isinstance(uid, tuple):
|
||||
raise ValueError(f"{uid} is not a valid uid?!")
|
||||
|
||||
chan.uid = uid
|
||||
chan.uid = str(uid[0]), str(uid[1])
|
||||
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||
return uid
|
||||
|
||||
|
@ -1191,8 +1212,13 @@ class Arbiter(Actor):
|
|||
is_arbiter = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._registry = defaultdict(list)
|
||||
|
||||
self._registry: Dict[
|
||||
Tuple[str, str],
|
||||
Tuple[str, int],
|
||||
] = {}
|
||||
self._waiters = {}
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
|
||||
|
@ -1204,9 +1230,11 @@ class Arbiter(Actor):
|
|||
|
||||
async def get_registry(
|
||||
self
|
||||
) -> Dict[str, Tuple[str, str]]:
|
||||
"""Return current name registry.
|
||||
"""
|
||||
) -> Dict[Tuple[str, str], Tuple[str, int]]:
|
||||
'''Return current name registry.
|
||||
|
||||
This method is async to allow for cross-actor invocation.
|
||||
'''
|
||||
# NOTE: requires ``strict_map_key=False`` to the msgpack
|
||||
# unpacker since we have tuples as keys (not this makes the
|
||||
# arbiter suscetible to hashdos):
|
||||
|
@ -1214,13 +1242,14 @@ class Arbiter(Actor):
|
|||
return self._registry
|
||||
|
||||
async def wait_for_actor(
|
||||
self, name: str
|
||||
self,
|
||||
name: str,
|
||||
) -> List[Tuple[str, int]]:
|
||||
"""Wait for a particular actor to register.
|
||||
'''Wait for a particular actor to register.
|
||||
|
||||
This is a blocking call if no actor by the provided name is currently
|
||||
registered.
|
||||
"""
|
||||
'''
|
||||
sockaddrs = []
|
||||
|
||||
for (aname, _), sockaddr in self._registry.items():
|
||||
|
@ -1237,10 +1266,13 @@ class Arbiter(Actor):
|
|||
return sockaddrs
|
||||
|
||||
async def register_actor(
|
||||
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
||||
self,
|
||||
uid: Tuple[str, str],
|
||||
sockaddr: Tuple[str, int]
|
||||
|
||||
) -> None:
|
||||
name, uuid = uid
|
||||
self._registry[uid] = sockaddr
|
||||
uid = name, uuid = (str(uid[0]), str(uid[1]))
|
||||
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
|
||||
|
||||
# pop and signal all waiter events
|
||||
events = self._waiters.pop(name, ())
|
||||
|
@ -1249,5 +1281,9 @@ class Arbiter(Actor):
|
|||
if isinstance(event, trio.Event):
|
||||
event.set()
|
||||
|
||||
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
||||
async def unregister_actor(
|
||||
self,
|
||||
uid: Tuple[str, str]
|
||||
) -> None:
|
||||
uid = (str(uid[0]), str(uid[1]))
|
||||
self._registry.pop(uid)
|
||||
|
|
351
tractor/_ipc.py
351
tractor/_ipc.py
|
@ -2,11 +2,16 @@
|
|||
Inter-process comms abstractions
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import platform
|
||||
import struct
|
||||
import typing
|
||||
from typing import Any, Tuple, Optional
|
||||
from functools import partial
|
||||
from typing import (
|
||||
Any, Tuple, Optional,
|
||||
Type, Protocol, TypeVar
|
||||
)
|
||||
|
||||
from tricycle import BufferedReceiveStream
|
||||
import msgpack
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
@ -17,14 +22,54 @@ log = get_logger(__name__)
|
|||
|
||||
|
||||
_is_windows = platform.system() == 'Windows'
|
||||
log = get_logger(__name__)
|
||||
|
||||
# :eyeroll:
|
||||
try:
|
||||
import msgpack_numpy
|
||||
Unpacker = msgpack_numpy.Unpacker
|
||||
except ImportError:
|
||||
# just plain ``msgpack`` requires tweaking key settings
|
||||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||
|
||||
def get_stream_addrs(stream: trio.SocketStream) -> Tuple:
|
||||
# should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
rsockname = stream.socket.getpeername()
|
||||
return (
|
||||
tuple(lsockname[:2]),
|
||||
tuple(rsockname[:2]),
|
||||
)
|
||||
|
||||
|
||||
MsgType = TypeVar("MsgType")
|
||||
|
||||
# TODO: consider using a generic def and indexing with our eventual
|
||||
# msg definition/types?
|
||||
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
||||
# - https://jcristharif.com/msgspec/usage.html#structs
|
||||
|
||||
|
||||
class MsgTransport(Protocol[MsgType]):
|
||||
|
||||
stream: trio.SocketStream
|
||||
|
||||
def __init__(self, stream: trio.SocketStream) -> None:
|
||||
...
|
||||
|
||||
# XXX: should this instead be called `.sendall()`?
|
||||
async def send(self, msg: MsgType) -> None:
|
||||
...
|
||||
|
||||
async def recv(self) -> MsgType:
|
||||
...
|
||||
|
||||
def __aiter__(self) -> MsgType:
|
||||
...
|
||||
|
||||
def connected(self) -> bool:
|
||||
...
|
||||
|
||||
@property
|
||||
def laddr(self) -> Tuple[str, int]:
|
||||
...
|
||||
|
||||
@property
|
||||
def raddr(self) -> Tuple[str, int]:
|
||||
...
|
||||
|
||||
|
||||
class MsgpackTCPStream:
|
||||
|
@ -40,26 +85,21 @@ class MsgpackTCPStream:
|
|||
|
||||
self.stream = stream
|
||||
assert self.stream.socket
|
||||
|
||||
# should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
assert isinstance(lsockname, tuple)
|
||||
self._laddr = lsockname[:2]
|
||||
rsockname = stream.socket.getpeername()
|
||||
assert isinstance(rsockname, tuple)
|
||||
self._raddr = rsockname[:2]
|
||||
self._laddr, self._raddr = get_stream_addrs(stream)
|
||||
|
||||
# start and seed first entry to read loop
|
||||
# create read loop instance
|
||||
self._agen = self._iter_packets()
|
||||
# self._agen.asend(None) is None
|
||||
|
||||
self._send_lock = trio.StrictFIFOLock()
|
||||
|
||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||
"""Yield packets from the underlying stream.
|
||||
"""
|
||||
unpacker = Unpacker(
|
||||
unpacker = msgpack.Unpacker(
|
||||
raw=False,
|
||||
use_list=False,
|
||||
strict_map_key=False
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
|
@ -107,11 +147,11 @@ class MsgpackTCPStream:
|
|||
def raddr(self) -> Tuple[Any, ...]:
|
||||
return self._raddr
|
||||
|
||||
# XXX: should this instead be called `.sendall()`?
|
||||
async def send(self, data: Any) -> None:
|
||||
async def send(self, msg: Any) -> None:
|
||||
async with self._send_lock:
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
msgpack.dumps(msg, use_bin_type=True)
|
||||
)
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._agen.asend(None)
|
||||
|
@ -123,36 +163,173 @@ class MsgpackTCPStream:
|
|||
return self.stream.socket.fileno() != -1
|
||||
|
||||
|
||||
class Channel:
|
||||
"""An inter-process channel for communication between (remote) actors.
|
||||
class MsgspecTCPStream(MsgpackTCPStream):
|
||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using ``msgspec``.
|
||||
|
||||
Currently the only supported transport is a ``trio.SocketStream``.
|
||||
"""
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
destaddr: Optional[Tuple[str, int]] = None,
|
||||
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||
auto_reconnect: bool = False,
|
||||
stream: trio.SocketStream = None, # expected to be active
|
||||
stream: trio.SocketStream,
|
||||
prefix_size: int = 4,
|
||||
|
||||
) -> None:
|
||||
self._recon_seq = on_reconnect
|
||||
self._autorecon = auto_reconnect
|
||||
self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream(
|
||||
stream) if stream else None
|
||||
if self.msgstream and destaddr:
|
||||
raise ValueError(
|
||||
f"A stream was provided with local addr {self.laddr}"
|
||||
import msgspec
|
||||
|
||||
super().__init__(stream)
|
||||
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
|
||||
self.prefix_size = prefix_size
|
||||
|
||||
# TODO: struct aware messaging coders
|
||||
self.encode = msgspec.Encoder().encode
|
||||
self.decode = msgspec.Decoder().decode # dict[str, Any])
|
||||
|
||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||
'''Yield packets from the underlying stream.
|
||||
|
||||
'''
|
||||
import msgspec # noqa
|
||||
last_decode_failed: bool = False
|
||||
|
||||
while True:
|
||||
try:
|
||||
header = await self.recv_stream.receive_exactly(4)
|
||||
|
||||
except (
|
||||
ValueError,
|
||||
|
||||
# not sure entirely why we need this but without it we
|
||||
# seem to be getting racy failures here on
|
||||
# arbiter/registry name subs..
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
||||
|
||||
if header == b'':
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.transport(f'received header {size}') # type: ignore
|
||||
|
||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||
|
||||
log.transport(f"received {msg_bytes}") # type: ignore
|
||||
try:
|
||||
yield self.decode(msg_bytes)
|
||||
except (
|
||||
msgspec.DecodingError,
|
||||
UnicodeDecodeError,
|
||||
):
|
||||
if not last_decode_failed:
|
||||
# ignore decoding errors for now and assume they have to
|
||||
# do with a channel drop - hope that receiving from the
|
||||
# channel will raise an expected error and bubble up.
|
||||
log.error('`msgspec` failed to decode!?')
|
||||
last_decode_failed = True
|
||||
else:
|
||||
raise
|
||||
|
||||
async def send(self, msg: Any) -> None:
|
||||
async with self._send_lock:
|
||||
|
||||
bytes_data: bytes = self.encode(msg)
|
||||
|
||||
# supposedly the fastest says,
|
||||
# https://stackoverflow.com/a/54027962
|
||||
size: bytes = struct.pack("<I", len(bytes_data))
|
||||
|
||||
return await self.stream.send_all(size + bytes_data)
|
||||
|
||||
|
||||
def get_msg_transport(
|
||||
|
||||
key: Tuple[str, str],
|
||||
|
||||
) -> Type[MsgTransport]:
|
||||
|
||||
return {
|
||||
('msgpack', 'tcp'): MsgpackTCPStream,
|
||||
('msgspec', 'tcp'): MsgspecTCPStream,
|
||||
}[key]
|
||||
|
||||
|
||||
class Channel:
|
||||
'''An inter-process channel for communication between (remote) actors.
|
||||
|
||||
Currently the only supported transport is a ``trio.SocketStream``.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
|
||||
self,
|
||||
destaddr: Optional[Tuple[str, int]],
|
||||
|
||||
msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'),
|
||||
|
||||
# TODO: optional reconnection support?
|
||||
# auto_reconnect: bool = False,
|
||||
# on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||
|
||||
) -> None:
|
||||
|
||||
# self._recon_seq = on_reconnect
|
||||
# self._autorecon = auto_reconnect
|
||||
|
||||
# TODO: maybe expose this through the nursery api?
|
||||
try:
|
||||
# if installed load the msgspec transport since it's faster
|
||||
import msgspec # noqa
|
||||
msg_transport_type_key = ('msgspec', 'tcp')
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
self._destaddr = destaddr
|
||||
self._transport_key = msg_transport_type_key
|
||||
|
||||
# Either created in ``.connect()`` or passed in by
|
||||
# user in ``.from_stream()``.
|
||||
self._stream: Optional[trio.SocketStream] = None
|
||||
self.msgstream: Optional[MsgTransport] = None
|
||||
|
||||
# set after handshake - always uid of far end
|
||||
self.uid: Optional[Tuple[str, str]] = None
|
||||
|
||||
# set if far end actor errors internally
|
||||
self._exc: Optional[Exception] = None
|
||||
self._agen = self._aiter_recv()
|
||||
|
||||
self._closed: bool = False
|
||||
|
||||
@classmethod
|
||||
def from_stream(
|
||||
cls,
|
||||
stream: trio.SocketStream,
|
||||
**kwargs,
|
||||
|
||||
) -> Channel:
|
||||
|
||||
src, dst = get_stream_addrs(stream)
|
||||
chan = Channel(destaddr=dst, **kwargs)
|
||||
|
||||
# set immediately here from provided instance
|
||||
chan._stream = stream
|
||||
chan.set_msg_transport(stream)
|
||||
return chan
|
||||
|
||||
def set_msg_transport(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
type_key: Optional[Tuple[str, str]] = None,
|
||||
|
||||
) -> MsgTransport:
|
||||
type_key = type_key or self._transport_key
|
||||
self.msgstream = get_msg_transport(type_key)(stream)
|
||||
return self.msgstream
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.msgstream:
|
||||
return repr(
|
||||
|
@ -161,20 +338,19 @@ class Channel:
|
|||
return object.__repr__(self)
|
||||
|
||||
@property
|
||||
def laddr(self) -> Optional[Tuple[Any, ...]]:
|
||||
def laddr(self) -> Optional[Tuple[str, int]]:
|
||||
return self.msgstream.laddr if self.msgstream else None
|
||||
|
||||
@property
|
||||
def raddr(self) -> Optional[Tuple[Any, ...]]:
|
||||
def raddr(self) -> Optional[Tuple[str, int]]:
|
||||
return self.msgstream.raddr if self.msgstream else None
|
||||
|
||||
async def connect(
|
||||
|
||||
self,
|
||||
destaddr: Tuple[Any, ...] = None,
|
||||
**kwargs
|
||||
|
||||
) -> trio.SocketStream:
|
||||
) -> MsgTransport:
|
||||
|
||||
if self.connected():
|
||||
raise RuntimeError("channel is already connected?")
|
||||
|
@ -186,12 +362,12 @@ class Channel:
|
|||
*destaddr,
|
||||
**kwargs
|
||||
)
|
||||
self.msgstream = MsgpackTCPStream(stream)
|
||||
msgstream = self.set_msg_transport(stream)
|
||||
|
||||
log.transport(
|
||||
f'Opened channel to peer {self.laddr} -> {self.raddr}'
|
||||
f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}'
|
||||
)
|
||||
return stream
|
||||
return msgstream
|
||||
|
||||
async def send(self, item: Any) -> None:
|
||||
|
||||
|
@ -202,16 +378,15 @@ class Channel:
|
|||
|
||||
async def recv(self) -> Any:
|
||||
assert self.msgstream
|
||||
|
||||
try:
|
||||
return await self.msgstream.recv()
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
if self._autorecon:
|
||||
await self._reconnect()
|
||||
return await self.recv()
|
||||
|
||||
raise
|
||||
# try:
|
||||
# return await self.msgstream.recv()
|
||||
# except trio.BrokenResourceError:
|
||||
# if self._autorecon:
|
||||
# await self._reconnect()
|
||||
# return await self.recv()
|
||||
# raise
|
||||
|
||||
async def aclose(self) -> None:
|
||||
|
||||
|
@ -233,34 +408,36 @@ class Channel:
|
|||
def __aiter__(self):
|
||||
return self._agen
|
||||
|
||||
async def _reconnect(self) -> None:
|
||||
"""Handle connection failures by polling until a reconnect can be
|
||||
established.
|
||||
"""
|
||||
down = False
|
||||
while True:
|
||||
try:
|
||||
with trio.move_on_after(3) as cancel_scope:
|
||||
await self.connect()
|
||||
cancelled = cancel_scope.cancelled_caught
|
||||
if cancelled:
|
||||
log.transport(
|
||||
"Reconnect timed out after 3 seconds, retrying...")
|
||||
continue
|
||||
else:
|
||||
log.transport("Stream connection re-established!")
|
||||
# run any reconnection sequence
|
||||
on_recon = self._recon_seq
|
||||
if on_recon:
|
||||
await on_recon(self)
|
||||
break
|
||||
except (OSError, ConnectionRefusedError):
|
||||
if not down:
|
||||
down = True
|
||||
log.transport(
|
||||
f"Connection to {self.raddr} went down, waiting"
|
||||
" for re-establishment")
|
||||
await trio.sleep(1)
|
||||
# async def _reconnect(self) -> None:
|
||||
# """Handle connection failures by polling until a reconnect can be
|
||||
# established.
|
||||
# """
|
||||
# down = False
|
||||
# while True:
|
||||
# try:
|
||||
# with trio.move_on_after(3) as cancel_scope:
|
||||
# await self.connect()
|
||||
# cancelled = cancel_scope.cancelled_caught
|
||||
# if cancelled:
|
||||
# log.transport(
|
||||
# "Reconnect timed out after 3 seconds, retrying...")
|
||||
# continue
|
||||
# else:
|
||||
# log.transport("Stream connection re-established!")
|
||||
|
||||
# # TODO: run any reconnection sequence
|
||||
# # on_recon = self._recon_seq
|
||||
# # if on_recon:
|
||||
# # await on_recon(self)
|
||||
|
||||
# break
|
||||
# except (OSError, ConnectionRefusedError):
|
||||
# if not down:
|
||||
# down = True
|
||||
# log.transport(
|
||||
# f"Connection to {self.raddr} went down, waiting"
|
||||
# " for re-establishment")
|
||||
# await trio.sleep(1)
|
||||
|
||||
async def _aiter_recv(
|
||||
self
|
||||
|
@ -279,16 +456,14 @@ class Channel:
|
|||
# await self.msgstream.send(sent)
|
||||
except trio.BrokenResourceError:
|
||||
|
||||
if not self._autorecon:
|
||||
# if not self._autorecon:
|
||||
raise
|
||||
|
||||
await self.aclose()
|
||||
|
||||
if self._autorecon: # attempt reconnect
|
||||
await self._reconnect()
|
||||
continue
|
||||
else:
|
||||
return
|
||||
# if self._autorecon: # attempt reconnect
|
||||
# await self._reconnect()
|
||||
# continue
|
||||
|
||||
def connected(self) -> bool:
|
||||
return self.msgstream.connected() if self.msgstream else False
|
||||
|
|
|
@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled
|
|||
|
||||
|
||||
# set at startup and after forks
|
||||
_default_arbiter_host = '127.0.0.1'
|
||||
_default_arbiter_port = 1616
|
||||
_default_arbiter_host: str = '127.0.0.1'
|
||||
_default_arbiter_port: int = 1616
|
||||
|
||||
|
||||
logger = log.get_logger('tractor')
|
||||
|
@ -32,7 +32,7 @@ logger = log.get_logger('tractor')
|
|||
async def open_root_actor(
|
||||
|
||||
# defaults are above
|
||||
arbiter_addr: Tuple[str, int] = (
|
||||
arbiter_addr: Optional[Tuple[str, int]] = (
|
||||
_default_arbiter_host,
|
||||
_default_arbiter_port,
|
||||
),
|
||||
|
@ -97,7 +97,7 @@ async def open_root_actor(
|
|||
|
||||
arbiter_addr = (host, port) = arbiter_addr or (
|
||||
_default_arbiter_host,
|
||||
_default_arbiter_port
|
||||
_default_arbiter_port,
|
||||
)
|
||||
|
||||
loglevel = loglevel or log.get_loglevel()
|
||||
|
|
Loading…
Reference in New Issue