forked from goodboy/tractor
1
0
Fork 0

Change trace to transport level

CI_increment_for_windows_bidirstreaming
Tyler Goodlet 2021-06-30 13:47:07 -04:00
parent 0d599e213c
commit 5d74490f1e
3 changed files with 22 additions and 11 deletions

View File

@ -463,7 +463,8 @@ class Actor:
log.runtime(
f"already have channel(s) for {uid}:{chans}?"
)
log.trace(f"Registered {chan} for {uid}") # type: ignore
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel
self._peers[uid].append(chan)
@ -631,7 +632,7 @@ class Actor:
break
log.trace( # type: ignore
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')

View File

@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}")
# log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)

View File

@ -1,5 +1,6 @@
"""
Inter-process comms abstractions
"""
import platform
import typing
@ -61,7 +62,6 @@ class MsgpackTCPStream:
use_list=False,
)
while True:
try:
data = await self.stream.receive_some(2**10)
@ -88,7 +88,7 @@ class MsgpackTCPStream:
else:
raise
log.trace(f"received {data}") # type: ignore
log.transport(f"received {data}") # type: ignore
if data == b'':
raise TransportClosed(
@ -169,6 +169,7 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self,
destaddr: Tuple[Any, ...] = None,
**kwargs
@ -180,13 +181,21 @@ class Channel:
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream
async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore
log.transport(f"send `{item}`") # type: ignore
assert self.msgstream
await self.msgstream.send(item)
@ -205,7 +214,8 @@ class Channel:
raise
async def aclose(self) -> None:
log.debug(
log.transport(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
@ -234,11 +244,11 @@ class Channel:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warning(
log.transport(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warning("Stream connection re-established!")
log.transport("Stream connection re-established!")
# run any reconnection sequence
on_recon = self._recon_seq
if on_recon:
@ -247,7 +257,7 @@ class Channel:
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warning(
log.transport(
f"Connection to {self.raddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)