diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5e286c1..b108c90 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -30,7 +30,6 @@ import typing from typing import ( Any, runtime_checkable, - Optional, Protocol, Type, TypeVar, @@ -113,6 +112,13 @@ class MsgpackTCPStream(MsgTransport): using the ``msgspec`` codec lib. ''' + layer_key: int = 4 + name_key: str = 'tcp' + + # TODO: better naming for this? + # -[ ] check how libp2p does naming for such things? + codec_key: str = 'msgpack' + def __init__( self, stream: trio.SocketStream, @@ -268,7 +274,7 @@ class Channel: def __init__( self, - destaddr: Optional[tuple[str, int]], + destaddr: tuple[str, int]|None, msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), @@ -286,14 +292,14 @@ class Channel: # Either created in ``.connect()`` or passed in by # user in ``.from_stream()``. - self._stream: Optional[trio.SocketStream] = None - self.msgstream: Optional[MsgTransport] = None + self._stream: trio.SocketStream|None = None + self._transport: MsgTransport|None = None # set after handshake - always uid of far end - self.uid: Optional[tuple[str, str]] = None + self.uid: tuple[str, str]|None = None self._agen = self._aiter_recv() - self._exc: Optional[Exception] = None # set if far end actor errors + self._exc: Exception|None = None # set if far end actor errors self._closed: bool = False # flag set by ``Portal.cancel_actor()`` indicating remote @@ -301,6 +307,15 @@ class Channel: # runtime. self._cancel_called: bool = False + @property + def msgstream(self) -> MsgTransport: + log.info('`Channel.msgstream` is an old name, use `._transport`') + return self._transport + + @property + def transport(self) -> MsgTransport: + return self._transport + @classmethod def from_stream( cls, @@ -310,40 +325,44 @@ class Channel: ) -> Channel: src, dst = get_stream_addrs(stream) - chan = Channel(destaddr=dst, **kwargs) + chan = Channel( + destaddr=dst, + **kwargs, + ) # set immediately here from provided instance - chan._stream = stream + chan._stream: trio.SocketStream = stream chan.set_msg_transport(stream) return chan def set_msg_transport( self, stream: trio.SocketStream, - type_key: Optional[tuple[str, str]] = None, + type_key: tuple[str, str]|None = None, ) -> MsgTransport: type_key = type_key or self._transport_key - self.msgstream = get_msg_transport(type_key)(stream) - return self.msgstream + self._transport = get_msg_transport(type_key)(stream) + return self._transport def __repr__(self) -> str: - if self.msgstream: - return repr( - self.msgstream.stream.socket._sock - ).replace( # type: ignore - "socket.socket", - "Channel", - ) - return object.__repr__(self) + if not self._transport: + return '' + + return repr( + self._transport.stream.socket._sock + ).replace( # type: ignore + "socket.socket", + "Channel", + ) @property - def laddr(self) -> Optional[tuple[str, int]]: - return self.msgstream.laddr if self.msgstream else None + def laddr(self) -> tuple[str, int]|None: + return self._transport.laddr if self._transport else None @property - def raddr(self) -> Optional[tuple[str, int]]: - return self.msgstream.raddr if self.msgstream else None + def raddr(self) -> tuple[str, int]|None: + return self._transport.raddr if self._transport else None async def connect( self, @@ -362,12 +381,12 @@ class Channel: *destaddr, **kwargs ) - msgstream = self.set_msg_transport(stream) + transport = self.set_msg_transport(stream) log.transport( - f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}' + f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' ) - return msgstream + return transport async def send(self, item: Any) -> None: @@ -375,16 +394,16 @@ class Channel: '=> send IPC msg:\n\n' f'{pformat(item)}\n' ) # type: ignore - assert self.msgstream + assert self._transport - await self.msgstream.send(item) + await self._transport.send(item) async def recv(self) -> Any: - assert self.msgstream - return await self.msgstream.recv() + assert self._transport + return await self._transport.recv() # try: - # return await self.msgstream.recv() + # return await self._transport.recv() # except trio.BrokenResourceError: # if self._autorecon: # await self._reconnect() @@ -397,8 +416,8 @@ class Channel: f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) - assert self.msgstream - await self.msgstream.stream.aclose() + assert self._transport + await self._transport.stream.aclose() self._closed = True async def __aenter__(self): @@ -449,16 +468,16 @@ class Channel: Async iterate items from underlying stream. ''' - assert self.msgstream + assert self._transport while True: try: - async for item in self.msgstream: + async for item in self._transport: yield item # sent = yield item # if sent is not None: # # optimization, passing None through all the # # time is pointless - # await self.msgstream.send(sent) + # await self._transport.send(sent) except trio.BrokenResourceError: # if not self._autorecon: @@ -471,7 +490,7 @@ class Channel: # continue def connected(self) -> bool: - return self.msgstream.connected() if self.msgstream else False + return self._transport.connected() if self._transport else False @asynccontextmanager