forked from goodboy/tractor
				
			Change trace to transport level
							parent
							
								
									2dd7c064d3
								
							
						
					
					
						commit
						57de0d6b7b
					
				|  | @ -463,7 +463,8 @@ class Actor: | ||||||
|             log.runtime( |             log.runtime( | ||||||
|                 f"already have channel(s) for {uid}:{chans}?" |                 f"already have channel(s) for {uid}:{chans}?" | ||||||
|             ) |             ) | ||||||
|         log.trace(f"Registered {chan} for {uid}")  # type: ignore | 
 | ||||||
|  |         log.runtime(f"Registered {chan} for {uid}")  # type: ignore | ||||||
|         # append new channel |         # append new channel | ||||||
|         self._peers[uid].append(chan) |         self._peers[uid].append(chan) | ||||||
| 
 | 
 | ||||||
|  | @ -636,7 +637,7 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|                         break |                         break | ||||||
| 
 | 
 | ||||||
|                     log.trace(   # type: ignore |                     log.transport(   # type: ignore | ||||||
|                         f"Received msg {msg} from {chan.uid}") |                         f"Received msg {msg} from {chan.uid}") | ||||||
| 
 | 
 | ||||||
|                     cid = msg.get('cid') |                     cid = msg.get('cid') | ||||||
|  |  | ||||||
|  | @ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb): | ||||||
| 
 | 
 | ||||||
| #     async with aclosing(async_stdin): | #     async with aclosing(async_stdin): | ||||||
| #         async for msg in async_stdin: | #         async for msg in async_stdin: | ||||||
| #             log.trace(f"Stdin input:\n{msg}") | #             log.runtime(f"Stdin input:\n{msg}") | ||||||
| #             # encode to bytes | #             # encode to bytes | ||||||
| #             bmsg = str.encode(msg) | #             bmsg = str.encode(msg) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,5 +1,6 @@ | ||||||
| """ | """ | ||||||
| Inter-process comms abstractions | Inter-process comms abstractions | ||||||
|  | 
 | ||||||
| """ | """ | ||||||
| import platform | import platform | ||||||
| import typing | import typing | ||||||
|  | @ -61,7 +62,6 @@ class MsgpackTCPStream: | ||||||
|             use_list=False, |             use_list=False, | ||||||
|         ) |         ) | ||||||
|         while True: |         while True: | ||||||
| 
 |  | ||||||
|             try: |             try: | ||||||
|                 data = await self.stream.receive_some(2**10) |                 data = await self.stream.receive_some(2**10) | ||||||
| 
 | 
 | ||||||
|  | @ -88,7 +88,7 @@ class MsgpackTCPStream: | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|             log.trace(f"received {data}")  # type: ignore |             log.transport(f"received {data}")  # type: ignore | ||||||
| 
 | 
 | ||||||
|             if data == b'': |             if data == b'': | ||||||
|                 raise TransportClosed( |                 raise TransportClosed( | ||||||
|  | @ -169,6 +169,7 @@ class Channel: | ||||||
|         return self.msgstream.raddr if self.msgstream else None |         return self.msgstream.raddr if self.msgstream else None | ||||||
| 
 | 
 | ||||||
|     async def connect( |     async def connect( | ||||||
|  | 
 | ||||||
|         self, |         self, | ||||||
|         destaddr: Tuple[Any, ...] = None, |         destaddr: Tuple[Any, ...] = None, | ||||||
|         **kwargs |         **kwargs | ||||||
|  | @ -180,13 +181,21 @@ class Channel: | ||||||
| 
 | 
 | ||||||
|         destaddr = destaddr or self._destaddr |         destaddr = destaddr or self._destaddr | ||||||
|         assert isinstance(destaddr, tuple) |         assert isinstance(destaddr, tuple) | ||||||
|         stream = await trio.open_tcp_stream(*destaddr, **kwargs) | 
 | ||||||
|  |         stream = await trio.open_tcp_stream( | ||||||
|  |             *destaddr, | ||||||
|  |             **kwargs | ||||||
|  |         ) | ||||||
|         self.msgstream = MsgpackTCPStream(stream) |         self.msgstream = MsgpackTCPStream(stream) | ||||||
|  | 
 | ||||||
|  |         log.transport( | ||||||
|  |             f'Opened channel to peer {self.laddr} -> {self.raddr}' | ||||||
|  |         ) | ||||||
|         return stream |         return stream | ||||||
| 
 | 
 | ||||||
|     async def send(self, item: Any) -> None: |     async def send(self, item: Any) -> None: | ||||||
| 
 | 
 | ||||||
|         log.trace(f"send `{item}`")  # type: ignore |         log.transport(f"send `{item}`")  # type: ignore | ||||||
|         assert self.msgstream |         assert self.msgstream | ||||||
| 
 | 
 | ||||||
|         await self.msgstream.send(item) |         await self.msgstream.send(item) | ||||||
|  | @ -205,7 +214,8 @@ class Channel: | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|     async def aclose(self) -> None: |     async def aclose(self) -> None: | ||||||
|         log.debug( | 
 | ||||||
|  |         log.transport( | ||||||
|             f'Closing channel to {self.uid} ' |             f'Closing channel to {self.uid} ' | ||||||
|             f'{self.laddr} -> {self.raddr}' |             f'{self.laddr} -> {self.raddr}' | ||||||
|         ) |         ) | ||||||
|  | @ -234,11 +244,11 @@ class Channel: | ||||||
|                     await self.connect() |                     await self.connect() | ||||||
|                 cancelled = cancel_scope.cancelled_caught |                 cancelled = cancel_scope.cancelled_caught | ||||||
|                 if cancelled: |                 if cancelled: | ||||||
|                     log.warning( |                     log.transport( | ||||||
|                         "Reconnect timed out after 3 seconds, retrying...") |                         "Reconnect timed out after 3 seconds, retrying...") | ||||||
|                     continue |                     continue | ||||||
|                 else: |                 else: | ||||||
|                     log.warning("Stream connection re-established!") |                     log.transport("Stream connection re-established!") | ||||||
|                     # run any reconnection sequence |                     # run any reconnection sequence | ||||||
|                     on_recon = self._recon_seq |                     on_recon = self._recon_seq | ||||||
|                     if on_recon: |                     if on_recon: | ||||||
|  | @ -247,7 +257,7 @@ class Channel: | ||||||
|             except (OSError, ConnectionRefusedError): |             except (OSError, ConnectionRefusedError): | ||||||
|                 if not down: |                 if not down: | ||||||
|                     down = True |                     down = True | ||||||
|                     log.warning( |                     log.transport( | ||||||
|                         f"Connection to {self.raddr} went down, waiting" |                         f"Connection to {self.raddr} went down, waiting" | ||||||
|                         " for re-establishment") |                         " for re-establishment") | ||||||
|                 await trio.sleep(1) |                 await trio.sleep(1) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue