Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 134a84e39e Accept transport closed error during handshake and msg loop 2021-07-01 07:44:03 -04:00
Tyler Goodlet 1c4d418f81 Drop happy eyeballs inf delay 2021-06-30 21:31:19 -04:00
Tyler Goodlet 1a068add5d Add our own "transport closed" signal
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
2021-06-30 19:46:31 -04:00
Tyler Goodlet 97f44e2e27 Add streaming decode support for `msgspec`
Add a `tractor._ipc.MsgspecStream` type which can be swapped in for
`msgspec` serialization transparently. A small msg-length-prefix framing
is implemented as part of the type and we use
`tricycle.BufferedReceieveStream` to handle buffering logic for the
underlying transport.

Notes:
- had to force cast a few more list  -> tuple spots due to no native
  `tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30
- the framing can be understood by this protobuf walkthrough:
  https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
- `tricycle` becomes a new dependency
2021-06-30 19:43:38 -04:00
Tyler Goodlet 240351a294 Always cast arbiter addr to tuple 2021-06-30 19:43:38 -04:00
Tyler Goodlet 12aa2fe562 Add `tricycle` and `msgspec` deps 2021-06-30 19:43:38 -04:00
Tyler Goodlet 9da35dc8bd Try out `msgspec` in our msgpack stream channel
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
2021-06-30 19:43:38 -04:00
Tyler Goodlet aef8f0d18c Cast to tuples for all uids explicitly 2021-06-30 19:43:38 -04:00
Tyler Goodlet 498cd7e3a2 Move debugger wait inside OCA nursery 2021-06-30 19:42:49 -04:00
Tyler Goodlet d2b5d13b0a Don't shield debugger status wait; it causes hangs 2021-06-30 19:42:49 -04:00
5 changed files with 207 additions and 52 deletions

View File

@ -38,13 +38,20 @@ setup(
'tractor.testing',
],
install_requires=[
# trio related
'trio>0.8',
'msgpack',
'async_generator',
'tricycle',
'trio_typing',
'colorlog',
'wrapt',
'trio_typing',
'pdbpp',
# serialization
'msgpack',
'msgspec',
],
tests_require=['pytest'],
python_requires=">=3.7",

View File

@ -31,6 +31,7 @@ from ._exceptions import (
ModuleNotExposed,
is_multi_cancelled,
ContextCancelled,
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
@ -249,7 +250,7 @@ class Actor:
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
@ -279,7 +280,7 @@ class Actor:
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel
self._arb_addr = arbiter_addr
self._arb_addr = tuple(arbiter_addr)
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -385,7 +386,18 @@ class Actor:
# send/receive initial handshake response
try:
uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake")
return
@ -529,6 +541,7 @@ class Actor:
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.debug(
@ -635,22 +648,39 @@ class Actor:
)
await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled:
# debugging only
log.debug(f"Msg loop was cancelled for {chan}")
raise
finally:
# msg debugging for when he machinery is brokey
log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
@ -690,7 +720,15 @@ class Actor:
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
setattr(self, attr, value)
if attr == '_arb_addr':
# XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
self._arb_addr = tuple(value)
else:
setattr(self, attr, value)
# Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our
@ -1075,10 +1113,10 @@ class Actor:
parlance.
"""
await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv()
uid: Tuple[str, str] = tuple(await chan.recv())
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
# if not isinstance(uid, tuple):
# raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
@ -1145,8 +1183,9 @@ class Arbiter(Actor):
async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None:
uid = tuple(uid)
name, uuid = uid
self._registry[uid] = sockaddr
self._registry[uid] = tuple(sockaddr)
# pop and signal all waiter events
events = self._waiters.pop(name, ())
@ -1156,4 +1195,4 @@ class Arbiter(Actor):
event.set()
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid)
self._registry.pop(tuple(uid))

View File

@ -41,6 +41,10 @@ class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError):
"No final result is expected for this actor"
@ -66,12 +70,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
def unpack_error(
msg: Dict[str, Any],
chan=None,
err_type=RemoteActorError
) -> Exception:
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
error = msg['error']

View File

@ -1,16 +1,20 @@
"""
Inter-process comms abstractions
"""
from functools import partial
import struct
import typing
from typing import Any, Tuple, Optional
from functools import partial
from tricycle import BufferedReceiveStream
import msgpack
import msgspec
import trio
from async_generator import asynccontextmanager
from .log import get_logger
log = get_logger('ipc')
from ._exceptions import TransportClosed
log = get_logger(__name__)
# :eyeroll:
try:
@ -21,21 +25,32 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
"""
def __init__(self, stream: trio.SocketStream) -> None:
class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``.
'''
def __init__(
self,
stream: trio.SocketStream,
) -> None:
self.stream = stream
assert self.stream.socket
# should both be IP sockets
lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2]
rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2]
# start first entry to read loop
self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -46,16 +61,13 @@ class MsgpackStream:
use_list=False,
)
while True:
try:
data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return
data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore
if data == b'':
log.debug(f"Stream connection {self.raddr} was closed")
return
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
unpacker.feed(data)
for packet in unpacker:
@ -73,7 +85,8 @@ class MsgpackStream:
async def send(self, data: Any) -> None:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
msgpack.dumps(data, use_bin_type=True)
)
async def recv(self) -> Any:
return await self._agen.asend(None)
@ -85,26 +98,92 @@ class MsgpackStream:
return self.stream.socket.fileno() != -1
class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``.
'''
ms_encode = msgspec.Encoder().encode
def __init__(
self,
stream: trio.SocketStream,
prefix_size: int = 4,
) -> None:
super().__init__(stream)
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
self.prefix_size = prefix_size
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
decoder = msgspec.Decoder() # dict[str, Any])
while True:
try:
header = await self.recv_stream.receive_exactly(4)
except (ValueError):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header)
log.trace(f'received header {size}')
msg_bytes = await self.recv_stream.receive_exactly(size)
log.trace(f"received {msg_bytes}") # type: ignore
yield decoder.decode(msg_bytes)
async def send(self, data: Any) -> None:
async with self._send_lock:
bytes_data = self.ms_encode(data)
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: int = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
class Channel:
"""An inter-process channel for communication between (remote) actors.
Currently the only supported transport is a ``trio.SocketStream``.
"""
def __init__(
self,
destaddr: Optional[Tuple[str, int]] = None,
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active
# stream_serializer_type: type = MsgspecTCPStream,
stream_serializer_type: type = MsgpackTCPStream,
) -> None:
self._recon_seq = on_reconnect
self._autorecon = auto_reconnect
self.msgstream: Optional[MsgpackStream] = MsgpackStream(
self.stream_serializer_type = stream_serializer_type
self.msgstream: Optional[type] = stream_serializer_type(
stream) if stream else None
if self.msgstream and destaddr:
raise ValueError(
f"A stream was provided with local addr {self.laddr}"
)
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
# set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None
@ -112,6 +191,8 @@ class Channel:
self._exc: Optional[Exception] = None
self._agen = self._aiter_recv()
self._closed: bool = False
def __repr__(self) -> str:
if self.msgstream:
return repr(
@ -128,35 +209,51 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self, destaddr: Tuple[Any, ...] = None,
self,
destaddr: Tuple[Any, ...] = None,
**kwargs
) -> trio.SocketStream:
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = self.stream_serializer_type(stream)
return stream
async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore
assert self.msgstream
await self.msgstream.send(item)
async def recv(self) -> Any:
assert self.msgstream
try:
return await self.msgstream.recv()
except trio.BrokenResourceError:
if self._autorecon:
await self._reconnect()
return await self.recv()
raise
async def aclose(self) -> None:
log.debug(f"Closing {self}")
assert self.msgstream
await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self):
await self.connect()

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager
from . import _debug
from ._state import current_actor, is_main_process
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
@ -254,6 +254,26 @@ async def _open_and_supervise_one_cancels_all_nursery(
"to complete"
)
except BaseException as err:
if is_root_process() and (
type(err) in {
Exception, trio.MultiError, trio.Cancelled
}
):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
debug_complete = _debug._pdb_complete
if debug_complete and not debug_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
await _debug._pdb_complete.wait()
# raise
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
@ -368,26 +388,11 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
except (Exception, trio.MultiError, trio.Cancelled):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
if not _debug._pdb_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
with trio.CancelScope(shield=True):
await _debug._pdb_complete.wait()
raise
# try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
else: # sub-nursery case