forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/_ipc.py

481 lines
13 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2018-05-30 16:36:23 +00:00
"""
Inter-process comms abstractions
2021-06-30 17:47:07 +00:00
2018-05-30 16:36:23 +00:00
"""
from __future__ import annotations
import platform
import struct
import typing
2022-07-12 15:22:30 +00:00
from collections.abc import (
AsyncGenerator,
AsyncIterator,
)
from typing import (
2022-07-12 15:22:30 +00:00
Any,
runtime_checkable,
Optional,
Protocol,
Type,
TypeVar,
)
2018-05-30 16:36:23 +00:00
from tricycle import BufferedReceiveStream
import msgspec
2018-05-30 16:36:23 +00:00
import trio
2018-07-11 22:08:57 +00:00
from async_generator import asynccontextmanager
2018-05-30 16:36:23 +00:00
from .log import get_logger
from ._exceptions import TransportClosed
log = get_logger(__name__)
2018-05-30 16:36:23 +00:00
_is_windows = platform.system() == 'Windows'
log = get_logger(__name__)
2022-07-12 15:22:30 +00:00
def get_stream_addrs(stream: trio.SocketStream) -> tuple:
# should both be IP sockets
lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername()
return (
tuple(lsockname[:2]),
tuple(rsockname[:2]),
)
MsgType = TypeVar("MsgType")
# TODO: consider using a generic def and indexing with our eventual
# msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
2022-07-12 15:22:30 +00:00
@runtime_checkable
class MsgTransport(Protocol[MsgType]):
stream: trio.SocketStream
2021-12-02 17:34:27 +00:00
drained: list[MsgType]
def __init__(self, stream: trio.SocketStream) -> None:
...
# XXX: should this instead be called `.sendall()`?
async def send(self, msg: MsgType) -> None:
...
async def recv(self) -> MsgType:
...
def __aiter__(self) -> MsgType:
...
def connected(self) -> bool:
...
2021-12-02 17:34:27 +00:00
# defining this sync otherwise it causes a mypy error because it
# can't figure out it's a generator i guess?..?
def drain(self) -> AsyncIterator[dict]:
...
@property
2022-07-12 15:22:30 +00:00
def laddr(self) -> tuple[str, int]:
...
@property
2022-07-12 15:22:30 +00:00
def raddr(self) -> tuple[str, int]:
...
2022-07-12 15:22:30 +00:00
# TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport):
2022-02-07 17:47:50 +00:00
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using the ``msgspec`` codec lib.
'''
def __init__(
self,
stream: trio.SocketStream,
prefix_size: int = 4,
) -> None:
2018-05-30 16:36:23 +00:00
self.stream = stream
2019-12-10 05:55:03 +00:00
assert self.stream.socket
2019-12-10 05:55:03 +00:00
# should both be IP sockets
self._laddr, self._raddr = get_stream_addrs(stream)
2019-12-10 05:55:03 +00:00
# create read loop instance
2018-05-30 16:36:23 +00:00
self._agen = self._iter_packets()
2018-12-15 07:20:19 +00:00
self._send_lock = trio.StrictFIFOLock()
2018-05-30 16:36:23 +00:00
# public i guess?
2021-12-02 17:34:27 +00:00
self.drained: list[dict] = []
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
self.prefix_size = prefix_size
# TODO: struct aware messaging coders
2022-02-14 19:05:32 +00:00
self.encode = msgspec.msgpack.Encoder().encode
self.decode = msgspec.msgpack.Decoder().decode # dict[str, Any])
2021-12-02 17:34:27 +00:00
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream.
'''
import msgspec # noqa
decodes_failed: int = 0
while True:
try:
header = await self.recv_stream.receive_exactly(4)
except (
ValueError,
ConnectionResetError,
# not sure entirely why we need this but without it we
# seem to be getting racy failures here on
# arbiter/registry name subs..
trio.BrokenResourceError,
):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header)
2021-09-05 20:29:16 +00:00
log.transport(f'received header {size}') # type: ignore
msg_bytes = await self.recv_stream.receive_exactly(size)
2021-09-05 20:29:16 +00:00
log.transport(f"received {msg_bytes}") # type: ignore
try:
yield self.decode(msg_bytes)
except (
2022-02-16 18:06:24 +00:00
msgspec.DecodeError,
UnicodeDecodeError,
):
if decodes_failed < 4:
# ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up.
2022-07-01 18:37:46 +00:00
try:
msg_str: str | bytes = msg_bytes.decode()
2022-07-01 18:37:46 +00:00
except UnicodeDecodeError:
msg_str = msg_bytes
2022-07-01 18:37:46 +00:00
log.error(
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{msg_str!r}'
)
decodes_failed += 1
else:
raise
async def send(self, msg: Any) -> None:
async with self._send_lock:
bytes_data: bytes = self.encode(msg)
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
2021-07-01 18:52:52 +00:00
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
@property
2022-07-12 15:22:30 +00:00
def laddr(self) -> tuple[str, int]:
return self._laddr
@property
2022-07-12 15:22:30 +00:00
def raddr(self) -> tuple[str, int]:
return self._raddr
async def recv(self) -> Any:
return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
Drain the stream's remaining messages sent from
the far end until the connection is closed by
the peer.
'''
try:
async for msg in self._iter_packets():
self.drained.append(msg)
except TransportClosed:
for msg in self.drained:
yield msg
def __aiter__(self):
return self._agen
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
def get_msg_transport(
2022-07-12 15:22:30 +00:00
key: tuple[str, str],
) -> Type[MsgTransport]:
2021-09-08 01:07:33 +00:00
return {
('msgpack', 'tcp'): MsgpackTCPStream,
}[key]
2021-09-08 01:07:33 +00:00
class Channel:
'''
An inter-process channel for communication between (remote) actors.
2018-05-30 16:36:23 +00:00
Wraps a ``MsgStream``: transport + encoding IPC connection.
2022-07-12 15:22:30 +00:00
Currently we only support ``trio.SocketStream`` for transport
2022-07-12 15:22:30 +00:00
(aka TCP) and the ``msgpack`` interchange format via the ``msgspec``
codec libary.
2021-09-08 01:07:33 +00:00
'''
2018-05-30 16:36:23 +00:00
def __init__(
self,
2022-07-12 15:22:30 +00:00
destaddr: Optional[tuple[str, int]],
2022-07-12 15:22:30 +00:00
msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'),
# TODO: optional reconnection support?
# auto_reconnect: bool = False,
# on_reconnect: typing.Callable[..., typing.Awaitable] = None,
) -> None:
# self._recon_seq = on_reconnect
# self._autorecon = auto_reconnect
self._destaddr = destaddr
self._transport_key = msg_transport_type_key
# Either created in ``.connect()`` or passed in by
# user in ``.from_stream()``.
self._stream: Optional[trio.SocketStream] = None
self.msgstream: Optional[MsgTransport] = None
2021-07-01 18:52:52 +00:00
2018-06-21 21:09:22 +00:00
# set after handshake - always uid of far end
2022-07-12 15:22:30 +00:00
self.uid: Optional[tuple[str, str]] = None
2021-07-01 18:52:52 +00:00
self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
@classmethod
def from_stream(
cls,
stream: trio.SocketStream,
**kwargs,
) -> Channel:
src, dst = get_stream_addrs(stream)
chan = Channel(destaddr=dst, **kwargs)
# set immediately here from provided instance
chan._stream = stream
chan.set_msg_transport(stream)
return chan
def set_msg_transport(
self,
stream: trio.SocketStream,
2022-07-12 15:22:30 +00:00
type_key: Optional[tuple[str, str]] = None,
) -> MsgTransport:
type_key = type_key or self._transport_key
self.msgstream = get_msg_transport(type_key)(stream)
return self.msgstream
def __repr__(self) -> str:
if self.msgstream:
return repr(
2019-12-10 05:55:03 +00:00
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
return object.__repr__(self)
@property
2022-07-12 15:22:30 +00:00
def laddr(self) -> Optional[tuple[str, int]]:
return self.msgstream.laddr if self.msgstream else None
@property
2022-07-12 15:22:30 +00:00
def raddr(self) -> Optional[tuple[str, int]]:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self,
destaddr: tuple[Any, ...] | None = None,
2019-12-10 05:55:03 +00:00
**kwargs
) -> MsgTransport:
2018-07-04 07:16:00 +00:00
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
2019-12-10 05:55:03 +00:00
assert isinstance(destaddr, tuple)
2021-06-30 17:47:07 +00:00
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
msgstream = self.set_msg_transport(stream)
2021-06-30 17:47:07 +00:00
log.transport(
f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}'
2021-06-30 17:47:07 +00:00
)
return msgstream
2018-05-30 16:36:23 +00:00
2018-08-31 21:16:24 +00:00
async def send(self, item: Any) -> None:
2021-06-30 17:47:07 +00:00
log.transport(f"send `{item}`") # type: ignore
assert self.msgstream
await self.msgstream.send(item)
2018-05-30 16:36:23 +00:00
2018-08-31 21:16:24 +00:00
async def recv(self) -> Any:
assert self.msgstream
return await self.msgstream.recv()
# try:
# return await self.msgstream.recv()
# except trio.BrokenResourceError:
# if self._autorecon:
# await self._reconnect()
# return await self.recv()
# raise
async def aclose(self) -> None:
2021-06-30 17:47:07 +00:00
log.transport(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self.msgstream
await self.msgstream.stream.aclose()
self._closed = True
2018-05-30 16:36:23 +00:00
async def __aenter__(self):
await self.connect()
2018-05-30 16:36:23 +00:00
return self
async def __aexit__(self, *args):
await self.aclose(*args)
def __aiter__(self):
return self._agen
# async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be
# established.
# """
# down = False
# while True:
# try:
# with trio.move_on_after(3) as cancel_scope:
# await self.connect()
# cancelled = cancel_scope.cancelled_caught
# if cancelled:
# log.transport(
# "Reconnect timed out after 3 seconds, retrying...")
# continue
# else:
# log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq
# # if on_recon:
# # await on_recon(self)
# break
# except (OSError, ConnectionRefusedError):
# if not down:
# down = True
# log.transport(
# f"Connection to {self.raddr} went down, waiting"
# " for re-establishment")
# await trio.sleep(1)
2018-05-30 16:36:23 +00:00
async def _aiter_recv(
self
2021-12-02 17:34:27 +00:00
) -> AsyncGenerator[Any, None]:
'''
Async iterate items from underlying stream.
'''
assert self.msgstream
2018-05-30 16:36:23 +00:00
while True:
try:
async for item in self.msgstream:
2018-05-30 16:36:23 +00:00
yield item
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self.msgstream.send(sent)
2018-11-09 06:53:15 +00:00
except trio.BrokenResourceError:
# if not self._autorecon:
raise
2018-07-04 07:16:00 +00:00
await self.aclose()
# if self._autorecon: # attempt reconnect
# await self._reconnect()
# continue
2018-06-21 21:09:22 +00:00
def connected(self) -> bool:
return self.msgstream.connected() if self.msgstream else False
2018-07-11 22:08:57 +00:00
@asynccontextmanager
async def _connect_chan(
host: str, port: int
2018-08-31 21:16:24 +00:00
) -> typing.AsyncGenerator[Channel, None]:
2021-12-02 17:34:27 +00:00
'''
Create and connect a channel with disconnect on context manager
teardown.
2021-12-02 17:34:27 +00:00
'''
2018-07-11 22:08:57 +00:00
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()