diff --git a/tractor/_actor.py b/tractor/_actor.py index dc8dc5d..389d955 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -384,7 +384,8 @@ class Actor: log.warning( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -517,7 +518,7 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') diff --git a/tractor/_debug.py b/tractor/_debug.py index 53d1f44..3570dc6 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index efe388e..08057e9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import platform import typing @@ -61,7 +62,6 @@ class MsgpackTCPStream: use_list=False, ) while True: - try: data = await self.stream.receive_some(2**10) @@ -88,7 +88,7 @@ class MsgpackTCPStream: else: raise - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -169,6 +169,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -180,13 +181,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -205,7 +214,8 @@ class Channel: raise async def aclose(self) -> None: - log.debug( + + log.transport( f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) @@ -234,11 +244,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -247,7 +257,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1)