forked from goodboy/tractor
Compare commits
11 Commits
master
...
msgspec_no
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 1e49066b16 | |
Tyler Goodlet | b1de90b175 | |
Tyler Goodlet | 2bd6bbc1b7 | |
Tyler Goodlet | 700f09ce9b | |
Tyler Goodlet | d8dcee3713 | |
Tyler Goodlet | 6463aa1559 | |
Tyler Goodlet | 39453e43e0 | |
Tyler Goodlet | d89e632a16 | |
Tyler Goodlet | b38b4fe188 | |
Tyler Goodlet | e6aecf2ae5 | |
Tyler Goodlet | b44652c5d5 |
18
setup.py
18
setup.py
|
@ -38,14 +38,28 @@ setup(
|
|||
'tractor.testing',
|
||||
],
|
||||
install_requires=[
|
||||
|
||||
# trio related
|
||||
'trio>0.8',
|
||||
'msgpack',
|
||||
'async_generator',
|
||||
'tricycle',
|
||||
'trio_typing',
|
||||
|
||||
# tooling
|
||||
'colorlog',
|
||||
'wrapt',
|
||||
'trio_typing',
|
||||
'pdbpp',
|
||||
|
||||
# serialization
|
||||
'msgpack',
|
||||
|
||||
],
|
||||
extras_require={
|
||||
|
||||
# serialization
|
||||
'msgspec': ["msgspec; python_version >= '3.9'"],
|
||||
|
||||
},
|
||||
tests_require=['pytest'],
|
||||
python_requires=">=3.7",
|
||||
keywords=[
|
||||
|
|
|
@ -123,8 +123,15 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
|||
|
||||
assert exc_info.type == tractor.MultiError
|
||||
err = exc_info.value
|
||||
assert len(err.exceptions) == num_subactors
|
||||
for exc in err.exceptions:
|
||||
exceptions = err.exceptions
|
||||
|
||||
if len(exceptions) == 2:
|
||||
# sometimes oddly now there's an embedded BrokenResourceError ?
|
||||
exceptions = exceptions[1].exceptions
|
||||
|
||||
assert len(exceptions) == num_subactors
|
||||
|
||||
for exc in exceptions:
|
||||
assert isinstance(exc, tractor.RemoteActorError)
|
||||
assert exc.type == AssertionError
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from ._exceptions import (
|
|||
unpack_error,
|
||||
ModuleNotExposed,
|
||||
is_multi_cancelled,
|
||||
TransportClosed,
|
||||
)
|
||||
from . import _debug
|
||||
from ._discovery import get_arbiter
|
||||
|
@ -202,7 +203,7 @@ class Actor:
|
|||
enable_modules: List[str] = [],
|
||||
uid: str = None,
|
||||
loglevel: str = None,
|
||||
arbiter_addr: Optional[Tuple[str, int]] = None,
|
||||
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
|
||||
spawn_method: Optional[str] = None
|
||||
) -> None:
|
||||
"""This constructor is called in the parent actor **before** the spawning
|
||||
|
@ -232,7 +233,7 @@ class Actor:
|
|||
# TODO: consider making this a dynamically defined
|
||||
# @dataclass once we get py3.7
|
||||
self.loglevel = loglevel
|
||||
self._arb_addr = arbiter_addr
|
||||
self._arb_addr = tuple(arbiter_addr)
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
@ -338,7 +339,18 @@ class Actor:
|
|||
# send/receive initial handshake response
|
||||
try:
|
||||
uid = await self._do_handshake(chan)
|
||||
except StopAsyncIteration:
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
TransportClosed,
|
||||
):
|
||||
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
||||
# and ``MsgpackStream._inter_packets()`` on a read from the
|
||||
# stream particularly when the runtime is first starting up
|
||||
# inside ``open_root_actor()`` where there is a check for
|
||||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.warning(f"Channel {chan} failed to handshake")
|
||||
return
|
||||
|
||||
|
@ -476,6 +488,7 @@ class Actor:
|
|||
# ``scope = Nursery.start()``
|
||||
task_status.started(loop_cs)
|
||||
async for msg in chan:
|
||||
|
||||
if msg is None: # loop terminate sentinel
|
||||
log.debug(
|
||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||
|
@ -578,22 +591,39 @@ class Actor:
|
|||
)
|
||||
await self.cancel_rpc_tasks(chan)
|
||||
|
||||
except (
|
||||
TransportClosed,
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
# channels "breaking" is ok since we don't have a teardown
|
||||
# handshake for them (yet) and instead we simply bail out
|
||||
# of the message loop and expect the teardown sequence
|
||||
# to clean up.
|
||||
log.error(f"{chan} form {chan.uid} closed abruptly")
|
||||
# raise
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
log.error(f"{chan} form {chan.uid} broke")
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
# ship any "internal" exception (i.e. one from internal machinery
|
||||
# not from an rpc task) to parent
|
||||
log.exception("Actor errored:")
|
||||
if self._parent_chan:
|
||||
await self._parent_chan.send(pack_error(err))
|
||||
raise
|
||||
|
||||
# if this is the `MainProcess` we expect the error broadcasting
|
||||
# above to trigger an error at consuming portal "checkpoints"
|
||||
raise
|
||||
|
||||
except trio.Cancelled:
|
||||
# debugging only
|
||||
log.debug(f"Msg loop was cancelled for {chan}")
|
||||
raise
|
||||
|
||||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.debug(
|
||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||
f"with last msg:\n{msg}")
|
||||
|
@ -633,6 +663,14 @@ class Actor:
|
|||
_state._runtime_vars.update(rvs)
|
||||
|
||||
for attr, value in parent_data.items():
|
||||
|
||||
if attr == '_arb_addr':
|
||||
# XXX: msgspec doesn't support serializing tuples
|
||||
# so just cash manually here since it's what our
|
||||
# internals expect.
|
||||
self._arb_addr = tuple(value)
|
||||
|
||||
else:
|
||||
setattr(self, attr, value)
|
||||
|
||||
return chan, accept_addr
|
||||
|
@ -1012,10 +1050,10 @@ class Actor:
|
|||
parlance.
|
||||
"""
|
||||
await chan.send(self.uid)
|
||||
uid: Tuple[str, str] = await chan.recv()
|
||||
uid: Tuple[str, str] = tuple(await chan.recv())
|
||||
|
||||
if not isinstance(uid, tuple):
|
||||
raise ValueError(f"{uid} is not a valid uid?!")
|
||||
# if not isinstance(uid, tuple):
|
||||
# raise ValueError(f"{uid} is not a valid uid?!")
|
||||
|
||||
chan.uid = uid
|
||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||
|
@ -1082,8 +1120,9 @@ class Arbiter(Actor):
|
|||
async def register_actor(
|
||||
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
||||
) -> None:
|
||||
uid = tuple(uid)
|
||||
name, uuid = uid
|
||||
self._registry[uid] = sockaddr
|
||||
self._registry[uid] = tuple(sockaddr)
|
||||
|
||||
# pop and signal all waiter events
|
||||
events = self._waiters.pop(name, ())
|
||||
|
@ -1093,4 +1132,4 @@ class Arbiter(Actor):
|
|||
event.set()
|
||||
|
||||
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
||||
self._registry.pop(uid)
|
||||
self._registry.pop(tuple(uid))
|
||||
|
|
|
@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError):
|
|||
"""
|
||||
|
||||
|
||||
class TransportClosed(trio.ClosedResourceError):
|
||||
"Underlying channel transport was closed prior to use"
|
||||
|
||||
|
||||
class NoResult(RuntimeError):
|
||||
"No final result is expected for this actor"
|
||||
|
||||
|
@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
|
|||
|
||||
|
||||
def unpack_error(
|
||||
|
||||
msg: Dict[str, Any],
|
||||
chan=None,
|
||||
err_type=RemoteActorError
|
||||
|
||||
) -> Exception:
|
||||
"""Unpack an 'error' message from the wire
|
||||
into a local ``RemoteActorError``.
|
||||
|
||||
"""
|
||||
tb_str = msg['error'].get('tb_str', '')
|
||||
return err_type(
|
||||
|
|
139
tractor/_ipc.py
139
tractor/_ipc.py
|
@ -1,16 +1,19 @@
|
|||
"""
|
||||
Inter-process comms abstractions
|
||||
"""
|
||||
from functools import partial
|
||||
import struct
|
||||
import typing
|
||||
from typing import Any, Tuple, Optional
|
||||
from functools import partial
|
||||
|
||||
from tricycle import BufferedReceiveStream
|
||||
import msgpack
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
from .log import get_logger
|
||||
log = get_logger('ipc')
|
||||
from ._exceptions import TransportClosed
|
||||
log = get_logger(__name__)
|
||||
|
||||
# :eyeroll:
|
||||
try:
|
||||
|
@ -21,21 +24,32 @@ except ImportError:
|
|||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||
|
||||
|
||||
class MsgpackStream:
|
||||
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
|
||||
"""
|
||||
def __init__(self, stream: trio.SocketStream) -> None:
|
||||
class MsgpackTCPStream:
|
||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using ``msgpack-python``.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
|
||||
) -> None:
|
||||
|
||||
self.stream = stream
|
||||
assert self.stream.socket
|
||||
|
||||
# should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
assert isinstance(lsockname, tuple)
|
||||
self._laddr = lsockname[:2]
|
||||
|
||||
rsockname = stream.socket.getpeername()
|
||||
assert isinstance(rsockname, tuple)
|
||||
self._raddr = rsockname[:2]
|
||||
|
||||
# start first entry to read loop
|
||||
self._agen = self._iter_packets()
|
||||
|
||||
self._send_lock = trio.StrictFIFOLock()
|
||||
|
||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||
|
@ -46,16 +60,13 @@ class MsgpackStream:
|
|||
use_list=False,
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
data = await self.stream.receive_some(2**10)
|
||||
log.trace(f"received {data}") # type: ignore
|
||||
except trio.BrokenResourceError:
|
||||
log.warning(f"Stream connection {self.raddr} broke")
|
||||
return
|
||||
|
||||
if data == b'':
|
||||
log.debug(f"Stream connection {self.raddr} was closed")
|
||||
return
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
unpacker.feed(data)
|
||||
for packet in unpacker:
|
||||
|
@ -73,7 +84,8 @@ class MsgpackStream:
|
|||
async def send(self, data: Any) -> None:
|
||||
async with self._send_lock:
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
msgpack.dumps(data, use_bin_type=True)
|
||||
)
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._agen.asend(None)
|
||||
|
@ -85,26 +97,101 @@ class MsgpackStream:
|
|||
return self.stream.socket.fileno() != -1
|
||||
|
||||
|
||||
class MsgspecTCPStream(MsgpackTCPStream):
|
||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using ``msgspec``.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
prefix_size: int = 4,
|
||||
|
||||
) -> None:
|
||||
super().__init__(stream)
|
||||
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
|
||||
self.prefix_size = prefix_size
|
||||
|
||||
import msgspec
|
||||
|
||||
# TODO: struct aware messaging coders
|
||||
self.encode = msgspec.Encoder().encode
|
||||
self.decode = msgspec.Decoder().decode # dict[str, Any])
|
||||
|
||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||
"""Yield packets from the underlying stream.
|
||||
"""
|
||||
|
||||
while True:
|
||||
try:
|
||||
header = await self.recv_stream.receive_exactly(4)
|
||||
|
||||
except (ValueError):
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
if header == b'':
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.trace(f'received header {size}')
|
||||
|
||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||
|
||||
log.trace(f"received {msg_bytes}") # type: ignore
|
||||
yield self.decode(msg_bytes)
|
||||
|
||||
async def send(self, data: Any) -> None:
|
||||
async with self._send_lock:
|
||||
|
||||
bytes_data = self.encode(data)
|
||||
|
||||
# supposedly the fastest says,
|
||||
# https://stackoverflow.com/a/54027962
|
||||
size: int = struct.pack("<I", len(bytes_data))
|
||||
|
||||
return await self.stream.send_all(size + bytes_data)
|
||||
|
||||
|
||||
class Channel:
|
||||
"""An inter-process channel for communication between (remote) actors.
|
||||
|
||||
Currently the only supported transport is a ``trio.SocketStream``.
|
||||
"""
|
||||
def __init__(
|
||||
|
||||
self,
|
||||
destaddr: Optional[Tuple[str, int]] = None,
|
||||
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||
auto_reconnect: bool = False,
|
||||
stream: trio.SocketStream = None, # expected to be active
|
||||
|
||||
) -> None:
|
||||
|
||||
self._recon_seq = on_reconnect
|
||||
self._autorecon = auto_reconnect
|
||||
self.msgstream: Optional[MsgpackStream] = MsgpackStream(
|
||||
|
||||
try:
|
||||
# if installed load the msgspec transport since it's faster
|
||||
import msgspec # noqa
|
||||
stream_serializer_type: type = MsgspecTCPStream
|
||||
|
||||
except ImportError:
|
||||
stream_serializer_type: type = MsgpackTCPStream
|
||||
|
||||
self.stream_serializer_type = stream_serializer_type
|
||||
self.msgstream: Optional[type] = stream_serializer_type(
|
||||
stream) if stream else None
|
||||
|
||||
if self.msgstream and destaddr:
|
||||
raise ValueError(
|
||||
f"A stream was provided with local addr {self.laddr}"
|
||||
)
|
||||
|
||||
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
||||
# set after handshake - always uid of far end
|
||||
self.uid: Optional[Tuple[str, str]] = None
|
||||
|
@ -112,6 +199,8 @@ class Channel:
|
|||
self._exc: Optional[Exception] = None
|
||||
self._agen = self._aiter_recv()
|
||||
|
||||
self._closed: bool = False
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.msgstream:
|
||||
return repr(
|
||||
|
@ -128,35 +217,51 @@ class Channel:
|
|||
return self.msgstream.raddr if self.msgstream else None
|
||||
|
||||
async def connect(
|
||||
self, destaddr: Tuple[Any, ...] = None,
|
||||
self,
|
||||
destaddr: Tuple[Any, ...] = None,
|
||||
**kwargs
|
||||
|
||||
) -> trio.SocketStream:
|
||||
|
||||
if self.connected():
|
||||
raise RuntimeError("channel is already connected?")
|
||||
|
||||
destaddr = destaddr or self._destaddr
|
||||
assert isinstance(destaddr, tuple)
|
||||
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||
self.msgstream = MsgpackStream(stream)
|
||||
|
||||
stream = await trio.open_tcp_stream(
|
||||
*destaddr,
|
||||
**kwargs
|
||||
)
|
||||
self.msgstream = self.stream_serializer_type(stream)
|
||||
return stream
|
||||
|
||||
async def send(self, item: Any) -> None:
|
||||
|
||||
log.trace(f"send `{item}`") # type: ignore
|
||||
assert self.msgstream
|
||||
|
||||
await self.msgstream.send(item)
|
||||
|
||||
async def recv(self) -> Any:
|
||||
assert self.msgstream
|
||||
|
||||
try:
|
||||
return await self.msgstream.recv()
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
if self._autorecon:
|
||||
await self._reconnect()
|
||||
return await self.recv()
|
||||
|
||||
raise
|
||||
|
||||
async def aclose(self) -> None:
|
||||
log.debug(f"Closing {self}")
|
||||
assert self.msgstream
|
||||
await self.msgstream.stream.aclose()
|
||||
self._closed = True
|
||||
log.error(f'CLOSING CHAN {self}')
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
|
|
Loading…
Reference in New Issue