From 8c927d708dfc0c5e1d4617f1e4b5814056b3e22a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:47:07 -0400 Subject: [PATCH] Change trace to transport level --- tractor/_actor.py | 5 +++-- tractor/_debug.py | 2 +- tractor/_ipc.py | 26 ++++++++++++++++++-------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index fd7d231..4359e13 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -463,7 +463,8 @@ class Actor: log.runtime( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -631,7 +632,7 @@ class Actor: break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') diff --git a/tractor/_debug.py b/tractor/_debug.py index 75e502a..c1b1832 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -102,7 +102,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index efe388e..08057e9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import platform import typing @@ -61,7 +62,6 @@ class MsgpackTCPStream: use_list=False, ) while True: - try: data = await self.stream.receive_some(2**10) @@ -88,7 +88,7 @@ class MsgpackTCPStream: else: raise - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -169,6 +169,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -180,13 +181,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -205,7 +214,8 @@ class Channel: raise async def aclose(self) -> None: - log.debug( + + log.transport( f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) @@ -234,11 +244,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -247,7 +257,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1)