forked from goodboy/tractor
				
			Change trace to transport level
							parent
							
								
									31590e82a3
								
							
						
					
					
						commit
						8c927d708d
					
				| 
						 | 
				
			
			@ -463,7 +463,8 @@ class Actor:
 | 
			
		|||
            log.runtime(
 | 
			
		||||
                f"already have channel(s) for {uid}:{chans}?"
 | 
			
		||||
            )
 | 
			
		||||
        log.trace(f"Registered {chan} for {uid}")  # type: ignore
 | 
			
		||||
 | 
			
		||||
        log.runtime(f"Registered {chan} for {uid}")  # type: ignore
 | 
			
		||||
        # append new channel
 | 
			
		||||
        self._peers[uid].append(chan)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -631,7 +632,7 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
                        break
 | 
			
		||||
 | 
			
		||||
                    log.trace(   # type: ignore
 | 
			
		||||
                    log.transport(   # type: ignore
 | 
			
		||||
                        f"Received msg {msg} from {chan.uid}")
 | 
			
		||||
 | 
			
		||||
                    cid = msg.get('cid')
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -102,7 +102,7 @@ class PdbwTeardown(pdbpp.Pdb):
 | 
			
		|||
 | 
			
		||||
#     async with aclosing(async_stdin):
 | 
			
		||||
#         async for msg in async_stdin:
 | 
			
		||||
#             log.trace(f"Stdin input:\n{msg}")
 | 
			
		||||
#             log.runtime(f"Stdin input:\n{msg}")
 | 
			
		||||
#             # encode to bytes
 | 
			
		||||
#             bmsg = str.encode(msg)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,6 @@
 | 
			
		|||
"""
 | 
			
		||||
Inter-process comms abstractions
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import platform
 | 
			
		||||
import typing
 | 
			
		||||
| 
						 | 
				
			
			@ -61,7 +62,6 @@ class MsgpackTCPStream:
 | 
			
		|||
            use_list=False,
 | 
			
		||||
        )
 | 
			
		||||
        while True:
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                data = await self.stream.receive_some(2**10)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -88,7 +88,7 @@ class MsgpackTCPStream:
 | 
			
		|||
                else:
 | 
			
		||||
                    raise
 | 
			
		||||
 | 
			
		||||
            log.trace(f"received {data}")  # type: ignore
 | 
			
		||||
            log.transport(f"received {data}")  # type: ignore
 | 
			
		||||
 | 
			
		||||
            if data == b'':
 | 
			
		||||
                raise TransportClosed(
 | 
			
		||||
| 
						 | 
				
			
			@ -169,6 +169,7 @@ class Channel:
 | 
			
		|||
        return self.msgstream.raddr if self.msgstream else None
 | 
			
		||||
 | 
			
		||||
    async def connect(
 | 
			
		||||
 | 
			
		||||
        self,
 | 
			
		||||
        destaddr: Tuple[Any, ...] = None,
 | 
			
		||||
        **kwargs
 | 
			
		||||
| 
						 | 
				
			
			@ -180,13 +181,21 @@ class Channel:
 | 
			
		|||
 | 
			
		||||
        destaddr = destaddr or self._destaddr
 | 
			
		||||
        assert isinstance(destaddr, tuple)
 | 
			
		||||
        stream = await trio.open_tcp_stream(*destaddr, **kwargs)
 | 
			
		||||
 | 
			
		||||
        stream = await trio.open_tcp_stream(
 | 
			
		||||
            *destaddr,
 | 
			
		||||
            **kwargs
 | 
			
		||||
        )
 | 
			
		||||
        self.msgstream = MsgpackTCPStream(stream)
 | 
			
		||||
 | 
			
		||||
        log.transport(
 | 
			
		||||
            f'Opened channel to peer {self.laddr} -> {self.raddr}'
 | 
			
		||||
        )
 | 
			
		||||
        return stream
 | 
			
		||||
 | 
			
		||||
    async def send(self, item: Any) -> None:
 | 
			
		||||
 | 
			
		||||
        log.trace(f"send `{item}`")  # type: ignore
 | 
			
		||||
        log.transport(f"send `{item}`")  # type: ignore
 | 
			
		||||
        assert self.msgstream
 | 
			
		||||
 | 
			
		||||
        await self.msgstream.send(item)
 | 
			
		||||
| 
						 | 
				
			
			@ -205,7 +214,8 @@ class Channel:
 | 
			
		|||
            raise
 | 
			
		||||
 | 
			
		||||
    async def aclose(self) -> None:
 | 
			
		||||
        log.debug(
 | 
			
		||||
 | 
			
		||||
        log.transport(
 | 
			
		||||
            f'Closing channel to {self.uid} '
 | 
			
		||||
            f'{self.laddr} -> {self.raddr}'
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -234,11 +244,11 @@ class Channel:
 | 
			
		|||
                    await self.connect()
 | 
			
		||||
                cancelled = cancel_scope.cancelled_caught
 | 
			
		||||
                if cancelled:
 | 
			
		||||
                    log.warning(
 | 
			
		||||
                    log.transport(
 | 
			
		||||
                        "Reconnect timed out after 3 seconds, retrying...")
 | 
			
		||||
                    continue
 | 
			
		||||
                else:
 | 
			
		||||
                    log.warning("Stream connection re-established!")
 | 
			
		||||
                    log.transport("Stream connection re-established!")
 | 
			
		||||
                    # run any reconnection sequence
 | 
			
		||||
                    on_recon = self._recon_seq
 | 
			
		||||
                    if on_recon:
 | 
			
		||||
| 
						 | 
				
			
			@ -247,7 +257,7 @@ class Channel:
 | 
			
		|||
            except (OSError, ConnectionRefusedError):
 | 
			
		||||
                if not down:
 | 
			
		||||
                    down = True
 | 
			
		||||
                    log.warning(
 | 
			
		||||
                    log.transport(
 | 
			
		||||
                        f"Connection to {self.raddr} went down, waiting"
 | 
			
		||||
                        " for re-establishment")
 | 
			
		||||
                await trio.sleep(1)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue