From 57de0d6b7bf25fcdb214edacd9e43e33f066e0b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:47:07 -0400 Subject: [PATCH] Change trace to transport level --- tractor/_actor.py | 5 +++-- tractor/_debug.py | 2 +- tractor/_ipc.py | 26 ++++++++++++++++++-------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 20e93bd..5ecc0d2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -463,7 +463,8 @@ class Actor: log.runtime( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -636,7 +637,7 @@ class Actor: break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') diff --git a/tractor/_debug.py b/tractor/_debug.py index 53d1f44..3570dc6 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index efe388e..08057e9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import platform import typing @@ -61,7 +62,6 @@ class MsgpackTCPStream: use_list=False, ) while True: - try: data = await self.stream.receive_some(2**10) @@ -88,7 +88,7 @@ class MsgpackTCPStream: else: raise - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -169,6 +169,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -180,13 +181,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -205,7 +214,8 @@ class Channel: raise async def aclose(self) -> None: - log.debug( + + log.transport( f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) @@ -234,11 +244,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -247,7 +257,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1)