forked from goodboy/tractor
1
0
Fork 0

Change trace to transport level

pre_bad_close
Tyler Goodlet 2021-06-30 13:47:07 -04:00
parent adbde4a2c1
commit ff0226cd22
3 changed files with 23 additions and 11 deletions

View File

@ -466,7 +466,7 @@ class Actor:
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -639,7 +639,7 @@ class Actor:
break break
log.trace( # type: ignore log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')

View File

@ -110,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin): # async with aclosing(async_stdin):
# async for msg in async_stdin: # async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}") # log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes # # encode to bytes
# bmsg = str.encode(msg) # bmsg = str.encode(msg)

View File

@ -1,5 +1,6 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
@ -58,7 +59,7 @@ class MsgpackTCPStream:
) )
while True: while True:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore log.transport(f"received {data}") # type: ignore
if data == b'': if data == b'':
raise TransportClosed( raise TransportClosed(
@ -139,6 +140,7 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
@ -150,13 +152,21 @@ class Channel:
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream) self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.transport(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
@ -175,11 +185,13 @@ class Channel:
raise raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(f"Closing {self}") log.transport(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self.msgstream assert self.msgstream
await self.msgstream.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self): async def __aenter__(self):
await self.connect() await self.connect()
@ -202,11 +214,11 @@ class Channel:
await self.connect() await self.connect()
cancelled = cancel_scope.cancelled_caught cancelled = cancel_scope.cancelled_caught
if cancelled: if cancelled:
log.warning( log.transport(
"Reconnect timed out after 3 seconds, retrying...") "Reconnect timed out after 3 seconds, retrying...")
continue continue
else: else:
log.warning("Stream connection re-established!") log.transport("Stream connection re-established!")
# run any reconnection sequence # run any reconnection sequence
on_recon = self._recon_seq on_recon = self._recon_seq
if on_recon: if on_recon:
@ -215,7 +227,7 @@ class Channel:
except (OSError, ConnectionRefusedError): except (OSError, ConnectionRefusedError):
if not down: if not down:
down = True down = True
log.warning( log.transport(
f"Connection to {self.raddr} went down, waiting" f"Connection to {self.raddr} went down, waiting"
" for re-establishment") " for re-establishment")
await trio.sleep(1) await trio.sleep(1)