forked from goodboy/tractor
1
0
Fork 0

Change trace to transport level

db_backup
Tyler Goodlet 2021-06-30 13:47:07 -04:00
parent 62a81a7e73
commit aace4eae5f
3 changed files with 24 additions and 11 deletions

View File

@ -427,7 +427,8 @@ class Actor:
log.warning(
f"already have channel(s) for {uid}:{chans}?"
)
log.trace(f"Registered {chan} for {uid}") # type: ignore
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel
self._peers[uid].append(chan)
@ -566,7 +567,7 @@ class Actor:
f" {chan} from {chan.uid}")
break
log.trace( # type: ignore
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')

View File

@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}")
# log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)

View File

@ -1,5 +1,6 @@
"""
Inter-process comms abstractions
"""
import typing
from typing import Any, Tuple, Optional
@ -58,7 +59,7 @@ class MsgpackTCPStream:
)
while True:
data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore
log.transport(f"received {data}") # type: ignore
if data == b'':
raise TransportClosed(
@ -139,6 +140,7 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self,
destaddr: Tuple[Any, ...] = None,
**kwargs
@ -150,13 +152,21 @@ class Channel:
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream
async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore
log.transport(f"send `{item}`") # type: ignore
assert self.msgstream
await self.msgstream.send(item)
@ -175,11 +185,13 @@ class Channel:
raise
async def aclose(self) -> None:
log.debug(f"Closing {self}")
log.transport(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self.msgstream
await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self):
await self.connect()
@ -202,11 +214,11 @@ class Channel:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warning(
log.transport(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warning("Stream connection re-established!")
log.transport("Stream connection re-established!")
# run any reconnection sequence
on_recon = self._recon_seq
if on_recon:
@ -215,7 +227,7 @@ class Channel:
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warning(
log.transport(
f"Connection to {self.raddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)