Add a big boi `Channel.pformat()/__repr__()`
Much like how `Context` has been implemented, try to give tons of high level details on all the lower level encapsulated primitives, namely the `.msgstream/.transport` and any useful runtime state. B) Impl deats, - adjust `.from_addr()` to only call `._addr.wrap_address()` when we detect `addr` is unwrapped. - add another `log.runtime()` using the new `.__repr__()` in `Channel.from_addr()`. - change to `UnwrappedAddress` as in prior commits.ns_aware
parent
6a5ccc2425
commit
e904af679b
|
@ -24,6 +24,7 @@ from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
|
import os
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import typing
|
import typing
|
||||||
|
@ -39,9 +40,10 @@ from tractor.ipc._types import (
|
||||||
transport_from_stream,
|
transport_from_stream,
|
||||||
)
|
)
|
||||||
from tractor._addr import (
|
from tractor._addr import (
|
||||||
|
is_wrapped_addr,
|
||||||
wrap_address,
|
wrap_address,
|
||||||
Address,
|
Address,
|
||||||
AddressTypes
|
UnwrappedAddress,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
|
@ -88,7 +90,8 @@ class Channel:
|
||||||
self.uid: tuple[str, str]|None = None
|
self.uid: tuple[str, str]|None = None
|
||||||
|
|
||||||
self._aiter_msgs = self._iter_msgs()
|
self._aiter_msgs = self._iter_msgs()
|
||||||
self._exc: Exception|None = None # set if far end actor errors
|
self._exc: Exception|None = None
|
||||||
|
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
|
|
||||||
# flag set by ``Portal.cancel_actor()`` indicating remote
|
# flag set by ``Portal.cancel_actor()`` indicating remote
|
||||||
|
@ -124,17 +127,26 @@ class Channel:
|
||||||
@classmethod
|
@classmethod
|
||||||
async def from_addr(
|
async def from_addr(
|
||||||
cls,
|
cls,
|
||||||
addr: AddressTypes,
|
addr: UnwrappedAddress,
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> Channel:
|
) -> Channel:
|
||||||
addr: Address = wrap_address(addr)
|
|
||||||
transport_cls = transport_from_addr(addr)
|
|
||||||
transport = await transport_cls.connect_to(addr, **kwargs)
|
|
||||||
|
|
||||||
log.transport(
|
if not is_wrapped_addr(addr):
|
||||||
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
|
addr: Address = wrap_address(addr)
|
||||||
|
|
||||||
|
transport_cls = transport_from_addr(addr)
|
||||||
|
transport = await transport_cls.connect_to(
|
||||||
|
addr,
|
||||||
|
**kwargs,
|
||||||
)
|
)
|
||||||
return Channel(transport=transport)
|
assert transport.raddr == addr
|
||||||
|
chan = Channel(transport=transport)
|
||||||
|
log.runtime(
|
||||||
|
f'Connected channel IPC transport\n'
|
||||||
|
f'[>\n'
|
||||||
|
f' |_{chan}\n'
|
||||||
|
)
|
||||||
|
return chan
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def apply_codec(
|
def apply_codec(
|
||||||
|
@ -154,16 +166,50 @@ class Channel:
|
||||||
self._transport.codec = orig
|
self._transport.codec = orig
|
||||||
|
|
||||||
# TODO: do a .src/.dst: str for maddrs?
|
# TODO: do a .src/.dst: str for maddrs?
|
||||||
def __repr__(self) -> str:
|
def pformat(self) -> str:
|
||||||
if not self._transport:
|
if not self._transport:
|
||||||
return '<Channel with inactive transport?>'
|
return '<Channel with inactive transport?>'
|
||||||
|
|
||||||
return repr(
|
tpt: MsgTransport = self._transport
|
||||||
self._transport
|
tpt_name: str = type(tpt).__name__
|
||||||
).replace( # type: ignore
|
tpt_status: str = (
|
||||||
"socket.socket",
|
'connected' if self.connected()
|
||||||
"Channel",
|
else 'closed'
|
||||||
)
|
)
|
||||||
|
return (
|
||||||
|
f'<Channel(\n'
|
||||||
|
f' |_status: {tpt_status!r}\n'
|
||||||
|
f' _closed={self._closed}\n'
|
||||||
|
f' _cancel_called={self._cancel_called}\n'
|
||||||
|
f'\n'
|
||||||
|
f' |_runtime: Actor\n'
|
||||||
|
f' pid={os.getpid()}\n'
|
||||||
|
f' uid={self.uid}\n'
|
||||||
|
f'\n'
|
||||||
|
f' |_msgstream: {tpt_name}\n'
|
||||||
|
f' proto={tpt.laddr.name_key!r}\n'
|
||||||
|
f' layer={tpt.layer_key!r}\n'
|
||||||
|
f' laddr={tpt.laddr}\n'
|
||||||
|
f' raddr={tpt.raddr}\n'
|
||||||
|
f' codec={tpt.codec_key!r}\n'
|
||||||
|
f' stream={tpt.stream}\n'
|
||||||
|
f' maddr={tpt.maddr!r}\n'
|
||||||
|
f' drained={tpt.drained}\n'
|
||||||
|
f' _send_lock={tpt._send_lock.statistics()}\n'
|
||||||
|
f')>\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: making this return a value that can be passed to
|
||||||
|
# `eval()` is entirely **optional** FYI!
|
||||||
|
# https://docs.python.org/3/library/functions.html#repr
|
||||||
|
# https://docs.python.org/3/reference/datamodel.html#object.__repr__
|
||||||
|
#
|
||||||
|
# Currently we target **readability** from a (console)
|
||||||
|
# logging perspective over `eval()`-ability since we do NOT
|
||||||
|
# target serializing non-struct instances!
|
||||||
|
# def __repr__(self) -> str:
|
||||||
|
__str__ = pformat
|
||||||
|
__repr__ = pformat
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def laddr(self) -> Address|None:
|
def laddr(self) -> Address|None:
|
||||||
|
@ -338,7 +384,7 @@ class Channel:
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _connect_chan(
|
async def _connect_chan(
|
||||||
addr: AddressTypes
|
addr: UnwrappedAddress
|
||||||
) -> typing.AsyncGenerator[Channel, None]:
|
) -> typing.AsyncGenerator[Channel, None]:
|
||||||
'''
|
'''
|
||||||
Create and connect a channel with disconnect on context manager
|
Create and connect a channel with disconnect on context manager
|
||||||
|
|
Loading…
Reference in New Issue