Compare commits

...

11 Commits

Author SHA1 Message Date
Tyler Goodlet 1e49066b16 Not sure exactly, but we're getting a top level multierr now? 2021-07-01 10:10:56 -04:00
Tyler Goodlet b1de90b175 Fix py version classifier 2021-07-01 09:54:59 -04:00
Tyler Goodlet 2bd6bbc1b7 Pkg `msgpec` as optional dep, load transport type if importable 2021-07-01 09:41:23 -04:00
Tyler Goodlet 700f09ce9b Accept transport closed error during handshake and msg loop 2021-07-01 09:00:46 -04:00
Tyler Goodlet d8dcee3713 Drop happy eyeballs inf delay 2021-07-01 09:00:19 -04:00
Tyler Goodlet 6463aa1559 Add our own "transport closed" signal
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
2021-07-01 09:00:14 -04:00
Tyler Goodlet 39453e43e0 Add streaming decode support for `msgspec`
Add a `tractor._ipc.MsgspecStream` type which can be swapped in for
`msgspec` serialization transparently. A small msg-length-prefix framing
is implemented as part of the type and we use
`tricycle.BufferedReceieveStream` to handle buffering logic for the
underlying transport.

Notes:
- had to force cast a few more list  -> tuple spots due to no native
  `tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30
- the framing can be understood by this protobuf walkthrough:
  https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
- `tricycle` becomes a new dependency
2021-07-01 08:57:15 -04:00
Tyler Goodlet d89e632a16 Always cast arbiter addr to tuple 2021-07-01 08:57:15 -04:00
Tyler Goodlet b38b4fe188 Add `tricycle` and `msgspec` deps 2021-07-01 08:57:15 -04:00
Tyler Goodlet e6aecf2ae5 Try out `msgspec` in our msgpack stream channel
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
2021-07-01 08:57:15 -04:00
Tyler Goodlet b44652c5d5 Cast to tuples for all uids explicitly 2021-07-01 08:57:15 -04:00
5 changed files with 205 additions and 33 deletions

View File

@ -38,14 +38,28 @@ setup(
'tractor.testing', 'tractor.testing',
], ],
install_requires=[ install_requires=[
# trio related
'trio>0.8', 'trio>0.8',
'msgpack',
'async_generator', 'async_generator',
'tricycle',
'trio_typing',
# tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
'trio_typing',
'pdbpp', 'pdbpp',
# serialization
'msgpack',
], ],
extras_require={
# serialization
'msgspec': ["msgspec; python_version >= '3.9'"],
},
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.7", python_requires=">=3.7",
keywords=[ keywords=[

View File

@ -123,8 +123,15 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
assert exc_info.type == tractor.MultiError assert exc_info.type == tractor.MultiError
err = exc_info.value err = exc_info.value
assert len(err.exceptions) == num_subactors exceptions = err.exceptions
for exc in err.exceptions:
if len(exceptions) == 2:
# sometimes oddly now there's an embedded BrokenResourceError ?
exceptions = exceptions[1].exceptions
assert len(exceptions) == num_subactors
for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError assert exc.type == AssertionError

View File

@ -27,6 +27,7 @@ from ._exceptions import (
unpack_error, unpack_error,
ModuleNotExposed, ModuleNotExposed,
is_multi_cancelled, is_multi_cancelled,
TransportClosed,
) )
from . import _debug from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
@ -202,7 +203,7 @@ class Actor:
enable_modules: List[str] = [], enable_modules: List[str] = [],
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = (None, None),
spawn_method: Optional[str] = None spawn_method: Optional[str] = None
) -> None: ) -> None:
"""This constructor is called in the parent actor **before** the spawning """This constructor is called in the parent actor **before** the spawning
@ -232,7 +233,7 @@ class Actor:
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = tuple(arbiter_addr)
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -338,7 +339,18 @@ class Actor:
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake") log.warning(f"Channel {chan} failed to handshake")
return return
@ -476,6 +488,7 @@ class Actor:
# ``scope = Nursery.start()`` # ``scope = Nursery.start()``
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
@ -578,22 +591,39 @@ class Actor:
) )
await self.cancel_rpc_tasks(chan) await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent # not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled: except trio.Cancelled:
# debugging only # debugging only
log.debug(f"Msg loop was cancelled for {chan}") log.debug(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
# msg debugging for when he machinery is brokey
log.debug( log.debug(
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
@ -633,7 +663,15 @@ class Actor:
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value)
if attr == '_arb_addr':
# XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
self._arb_addr = tuple(value)
else:
setattr(self, attr, value)
return chan, accept_addr return chan, accept_addr
@ -1012,10 +1050,10 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv() uid: Tuple[str, str] = tuple(await chan.recv())
if not isinstance(uid, tuple): # if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!") # raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
@ -1082,8 +1120,9 @@ class Arbiter(Actor):
async def register_actor( async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int] self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None: ) -> None:
uid = tuple(uid)
name, uuid = uid name, uuid = uid
self._registry[uid] = sockaddr self._registry[uid] = tuple(sockaddr)
# pop and signal all waiter events # pop and signal all waiter events
events = self._waiters.pop(name, ()) events = self._waiters.pop(name, ())
@ -1093,4 +1132,4 @@ class Arbiter(Actor):
event.set() event.set()
async def unregister_actor(self, uid: Tuple[str, str]) -> None: async def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid) self._registry.pop(tuple(uid))

View File

@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError):
""" """
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
def unpack_error( def unpack_error(
msg: Dict[str, Any], msg: Dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError
) -> Exception: ) -> Exception:
"""Unpack an 'error' message from the wire """Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
tb_str = msg['error'].get('tb_str', '') tb_str = msg['error'].get('tb_str', '')
return err_type( return err_type(

View File

@ -1,16 +1,19 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
from functools import partial
import struct
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
from functools import partial
from tricycle import BufferedReceiveStream
import msgpack import msgpack
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
log = get_logger('ipc') from ._exceptions import TransportClosed
log = get_logger(__name__)
# :eyeroll: # :eyeroll:
try: try:
@ -21,21 +24,32 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False) Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream: class MsgpackTCPStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data. '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
""" using ``msgpack-python``.
def __init__(self, stream: trio.SocketStream) -> None:
'''
def __init__(
self,
stream: trio.SocketStream,
) -> None:
self.stream = stream self.stream = stream
assert self.stream.socket assert self.stream.socket
# should both be IP sockets # should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple) assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2] self._laddr = lsockname[:2]
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
# start first entry to read loop
self._agen = self._iter_packets() self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -46,16 +60,13 @@ class MsgpackStream:
use_list=False, use_list=False,
) )
while True: while True:
try: data = await self.stream.receive_some(2**10)
data = await self.stream.receive_some(2**10) log.trace(f"received {data}") # type: ignore
log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return
if data == b'': if data == b'':
log.debug(f"Stream connection {self.raddr} was closed") raise TransportClosed(
return f'transport {self} was already closed prior ro read'
)
unpacker.feed(data) unpacker.feed(data)
for packet in unpacker: for packet in unpacker:
@ -73,7 +84,8 @@ class MsgpackStream:
async def send(self, data: Any) -> None: async def send(self, data: Any) -> None:
async with self._send_lock: async with self._send_lock:
return await self.stream.send_all( return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True)) msgpack.dumps(data, use_bin_type=True)
)
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._agen.asend(None)
@ -85,26 +97,101 @@ class MsgpackStream:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``.
'''
def __init__(
self,
stream: trio.SocketStream,
prefix_size: int = 4,
) -> None:
super().__init__(stream)
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
self.prefix_size = prefix_size
import msgspec
# TODO: struct aware messaging coders
self.encode = msgspec.Encoder().encode
self.decode = msgspec.Decoder().decode # dict[str, Any])
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
while True:
try:
header = await self.recv_stream.receive_exactly(4)
except (ValueError):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header)
log.trace(f'received header {size}')
msg_bytes = await self.recv_stream.receive_exactly(size)
log.trace(f"received {msg_bytes}") # type: ignore
yield self.decode(msg_bytes)
async def send(self, data: Any) -> None:
async with self._send_lock:
bytes_data = self.encode(data)
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: int = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
class Channel: class Channel:
"""An inter-process channel for communication between (remote) actors. """An inter-process channel for communication between (remote) actors.
Currently the only supported transport is a ``trio.SocketStream``. Currently the only supported transport is a ``trio.SocketStream``.
""" """
def __init__( def __init__(
self, self,
destaddr: Optional[Tuple[str, int]] = None, destaddr: Optional[Tuple[str, int]] = None,
on_reconnect: typing.Callable[..., typing.Awaitable] = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
) -> None: ) -> None:
self._recon_seq = on_reconnect self._recon_seq = on_reconnect
self._autorecon = auto_reconnect self._autorecon = auto_reconnect
self.msgstream: Optional[MsgpackStream] = MsgpackStream(
try:
# if installed load the msgspec transport since it's faster
import msgspec # noqa
stream_serializer_type: type = MsgspecTCPStream
except ImportError:
stream_serializer_type: type = MsgpackTCPStream
self.stream_serializer_type = stream_serializer_type
self.msgstream: Optional[type] = stream_serializer_type(
stream) if stream else None stream) if stream else None
if self.msgstream and destaddr: if self.msgstream and destaddr:
raise ValueError( raise ValueError(
f"A stream was provided with local addr {self.laddr}" f"A stream was provided with local addr {self.laddr}"
) )
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None self.uid: Optional[Tuple[str, str]] = None
@ -112,6 +199,8 @@ class Channel:
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._closed: bool = False
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if self.msgstream:
return repr( return repr(
@ -128,35 +217,51 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, destaddr: Tuple[Any, ...] = None, self,
destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
) -> trio.SocketStream: ) -> trio.SocketStream:
if self.connected(): if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream) stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = self.stream_serializer_type(stream)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.trace(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self.msgstream assert self.msgstream
try: try:
return await self.msgstream.recv() return await self.msgstream.recv()
except trio.BrokenResourceError: except trio.BrokenResourceError:
if self._autorecon: if self._autorecon:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
assert self.msgstream assert self.msgstream
await self.msgstream.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self): async def __aenter__(self):
await self.connect() await self.connect()