# tractor: structured concurrent "actors". # Copyright 2018-eternity Tyler Goodlet. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' typing.Protocol based generic msg API, implement this class to add backends for tractor.ipc.Channel ''' from __future__ import annotations from typing import ( runtime_checkable, Type, Protocol, TypeVar, ClassVar ) from collections.abc import ( AsyncGenerator, AsyncIterator, ) import struct from typing import ( Any, Callable, ) import trio import msgspec from tricycle import BufferedReceiveStream from tractor.log import get_logger from tractor._exceptions import ( MsgTypeError, TransportClosed, _mk_send_mte, _mk_recv_mte, ) from tractor.msg import ( _ctxvar_MsgCodec, # _codec, XXX see `self._codec` sanity/debug checks MsgCodec, types as msgtypes, pretty_struct, ) log = get_logger(__name__) # from tractor.msg.types import MsgType # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? # => BLEH, except can't bc prots must inherit typevar or param-spec # vars.. AddressType = TypeVar('AddressType') MsgType = TypeVar('MsgType') @runtime_checkable class MsgTransport(Protocol[AddressType, MsgType]): # # ^-TODO-^ consider using a generic def and indexing with our # eventual msg definition/types? # - https://docs.python.org/3/library/typing.html#typing.Protocol stream: trio.SocketStream drained: list[MsgType] address_type: ClassVar[Type[AddressType]] # XXX: should this instead be called `.sendall()`? async def send(self, msg: MsgType) -> None: ... async def recv(self) -> MsgType: ... def __aiter__(self) -> MsgType: ... def connected(self) -> bool: ... # defining this sync otherwise it causes a mypy error because it # can't figure out it's a generator i guess?..? def drain(self) -> AsyncIterator[dict]: ... @property def laddr(self) -> AddressType: ... @property def raddr(self) -> AddressType: ... @classmethod async def connect_to( cls, destaddr: AddressType, **kwargs ) -> MsgTransport: ... @classmethod def get_stream_addrs( cls, stream: trio.abc.Stream ) -> tuple[ AddressType, # local AddressType # remote ]: ''' Return the `trio` streaming transport prot's addrs for both the local and remote sides as a pair. ''' ... class MsgpackTransport(MsgTransport): # TODO: better naming for this? # -[ ] check how libp2p does naming for such things? codec_key: str = 'msgpack' def __init__( self, stream: trio.abc.Stream, prefix_size: int = 4, # XXX optionally provided codec pair for `msgspec`: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # # TODO: define this as a `Codec` struct which can be # overriden dynamically by the application/runtime? codec: MsgCodec = None, ) -> None: self.stream = stream self._laddr, self._raddr = self.get_stream_addrs(stream) # create read loop instance self._aiter_pkts = self._iter_packets() self._send_lock = trio.StrictFIFOLock() # public i guess? self.drained: list[dict] = [] self.recv_stream = BufferedReceiveStream( transport_stream=stream ) self.prefix_size = prefix_size # allow for custom IPC msg interchange format # dynamic override Bo self._task = trio.lowlevel.current_task() # XXX for ctxvar debug only! # self._codec: MsgCodec = ( # codec # or # _codec._ctxvar_MsgCodec.get() # ) async def _iter_packets(self) -> AsyncGenerator[dict, None]: ''' Yield `bytes`-blob decoded packets from the underlying TCP stream using the current task's `MsgCodec`. This is a streaming routine implemented as an async generator func (which was the original design, but could be changed?) and is allocated by a `.__call__()` inside `.__init__()` where it is assigned to the `._aiter_pkts` attr. ''' decodes_failed: int = 0 while True: try: header: bytes = await self.recv_stream.receive_exactly(4) except ( ValueError, ConnectionResetError, # not sure entirely why we need this but without it we # seem to be getting racy failures here on # arbiter/registry name subs.. trio.BrokenResourceError, ) as trans_err: loglevel = 'transport' match trans_err: # case ( # ConnectionResetError() # ): # loglevel = 'transport' # peer actor (graceful??) TCP EOF but `tricycle` # seems to raise a 0-bytes-read? case ValueError() if ( 'unclean EOF' in trans_err.args[0] ): pass # peer actor (task) prolly shutdown quickly due # to cancellation case trio.BrokenResourceError() if ( 'Connection reset by peer' in trans_err.args[0] ): pass # unless the disconnect condition falls under "a # normal operation breakage" we usualy console warn # about it. case _: loglevel: str = 'warning' raise TransportClosed( message=( f'IPC transport already closed by peer\n' f'x)> {type(trans_err)}\n' f' |_{self}\n' ), loglevel=loglevel, ) from trans_err # XXX definitely can happen if transport is closed # manually by another `trio.lowlevel.Task` in the # same actor; we use this in some simulated fault # testing for ex, but generally should never happen # under normal operation! # # NOTE: as such we always re-raise this error from the # RPC msg loop! except trio.ClosedResourceError as closure_err: raise TransportClosed( message=( f'IPC transport already manually closed locally?\n' f'x)> {type(closure_err)} \n' f' |_{self}\n' ), loglevel='error', raise_on_report=( closure_err.args[0] == 'another task closed this fd' or closure_err.args[0] in ['another task closed this fd'] ), ) from closure_err # graceful TCP EOF disconnect if header == b'': raise TransportClosed( message=( f'IPC transport already gracefully closed\n' f')>\n' f'|_{self}\n' ), loglevel='transport', # cause=??? # handy or no? ) size: int size, = struct.unpack(" None: ''' Send a msgpack encoded py-object-blob-as-msg over TCP. If `strict_types == True` then a `MsgTypeError` will be raised on any invalid msg type ''' __tracebackhide__: bool = hide_tb # XXX see `trio._sync.AsyncContextManagerMixin` for details # on the `.acquire()`/`.release()` sequencing.. async with self._send_lock: # NOTE: lookup the `trio.Task.context`'s var for # the current `MsgCodec`. codec: MsgCodec = _ctxvar_MsgCodec.get() # XXX for ctxvar debug only! # if self._codec.pld_spec != codec.pld_spec: # self._codec = codec # log.runtime( # f'Using new codec in {self}.send()\n' # f'codec: {self._codec}\n\n' # f'msg: {msg}\n' # ) if type(msg) not in msgtypes.__msg_types__: if strict_types: raise _mk_send_mte( msg, codec=codec, ) else: log.warning( 'Sending non-`Msg`-spec msg?\n\n' f'{msg}\n' ) try: bytes_data: bytes = codec.encode(msg) except TypeError as _err: typerr = _err msgtyperr: MsgTypeError = _mk_send_mte( msg, codec=codec, message=( f'IPC-msg-spec violation in\n\n' f'{pretty_struct.Struct.pformat(msg)}' ), src_type_error=typerr, ) raise msgtyperr from typerr # supposedly the fastest says, # https://stackoverflow.com/a/54027962 size: bytes = struct.pack(" # except BaseException as _err: # err = _err # if not isinstance(err, MsgTypeError): # __tracebackhide__: bool = False # raise async def recv(self) -> Any: return await self._aiter_pkts.asend(None) async def drain(self) -> AsyncIterator[dict]: ''' Drain the stream's remaining messages sent from the far end until the connection is closed by the peer. ''' try: async for msg in self._iter_packets(): self.drained.append(msg) except TransportClosed: for msg in self.drained: yield msg def __aiter__(self): return self._aiter_pkts @property def laddr(self) -> AddressType: return self._laddr @property def raddr(self) -> AddressType: return self._raddr