diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 64973fd4..9d109f3f 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -307,7 +307,12 @@ class Channel: ) -> None: ''' - Send a coded msg-blob over the transport. + Send a coded msg-blob over the underlying IPC transport. + + This fn raises `TransportClosed` on comms failures and is + normally handled by higher level runtime machinery for the + expected-graceful cases, normally ephemercal + (re/dis)connects. ''' __tracebackhide__: bool = hide_tb @@ -334,9 +339,10 @@ class Channel: except KeyError: raise err case TransportClosed(): + src_exc_str: str = err.repr_src_exc() log.transport( - f'Transport stream closed due to\n' - f'{err.repr_src_exc()}\n' + f'Transport stream closed due to,\n' + f'{src_exc_str}' ) case _: @@ -345,6 +351,11 @@ class Channel: raise async def recv(self) -> Any: + ''' + Receive the latest (queued) msg-blob from the underlying IPC + transport. + + ''' assert self._transport return await self._transport.recv() @@ -418,16 +429,18 @@ class Channel: self ) -> AsyncGenerator[Any, None]: ''' - Yield `MsgType` IPC msgs decoded and deliverd from - an underlying `MsgTransport` protocol. + Yield `MsgType` IPC msgs decoded and deliverd from an + underlying `MsgTransport` protocol. - This is a streaming routine alo implemented as an async-gen - func (same a `MsgTransport._iter_pkts()`) gets allocated by - a `.__call__()` inside `.__init__()` where it is assigned to - the `._aiter_msgs` attr. + This is a streaming routine alo implemented as an + async-generator func (same a `MsgTransport._iter_pkts()`) + gets allocated by a `.__call__()` inside `.__init__()` where + it is assigned to the `._aiter_msgs` attr. ''' - assert self._transport + if not self._transport: + raise RuntimeError('No IPC transport initialized!?') + while True: try: async for msg in self._transport: @@ -462,7 +475,15 @@ class Channel: # continue def connected(self) -> bool: - return self._transport.connected() if self._transport else False + ''' + Predicate whether underlying IPC tpt is connected. + + ''' + return ( + self._transport.connected() + if self._transport + else False + ) async def _do_handshake( self, @@ -493,8 +514,11 @@ async def _connect_chan( addr: UnwrappedAddress ) -> typing.AsyncGenerator[Channel, None]: ''' - Create and connect a channel with disconnect on context manager - teardown. + Create and connect a `Channel` to the provided `addr`, disconnect + it on cm exit. + + NOTE, this is a lowlevel, normally internal-only iface. You + should likely use `.open_portal()` instead. ''' chan = await Channel.from_addr(addr)