From aace4eae5f469d4271f6d40bb3e009c12dd00066 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:47:07 -0400 Subject: [PATCH] Change trace to transport level --- tractor/_actor.py | 5 +++-- tractor/_debug.py | 2 +- tractor/_ipc.py | 28 ++++++++++++++++++++-------- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index deb8aec..1573def 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -427,7 +427,8 @@ class Actor: log.warning( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -566,7 +567,7 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') diff --git a/tractor/_debug.py b/tractor/_debug.py index e0cba7a..77fac0f 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5dc1a2a..437194a 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import typing from typing import Any, Tuple, Optional @@ -58,7 +59,7 @@ class MsgpackTCPStream: ) while True: data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -139,6 +140,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -150,13 +152,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -175,11 +185,13 @@ class Channel: raise async def aclose(self) -> None: - log.debug(f"Closing {self}") + log.transport( + f'Closing channel to {self.uid} ' + f'{self.laddr} -> {self.raddr}' + ) assert self.msgstream await self.msgstream.stream.aclose() self._closed = True - log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect() @@ -202,11 +214,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -215,7 +227,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1)