Some more log message tweaks
- aggregate the `MsgStream.aclose()` "reader tasks" stats content into a common `message: str` before emit. - tweak an `_rpc.process_messages()` emit per new `Channel.__repr__()`.leslies_extra_appendix
parent
c2705cce68
commit
a528d45a30
|
@ -1219,8 +1219,10 @@ async def process_messages(
|
||||||
# -[ ] figure out how this will break with other transports?
|
# -[ ] figure out how this will break with other transports?
|
||||||
tc.report_n_maybe_raise(
|
tc.report_n_maybe_raise(
|
||||||
message=(
|
message=(
|
||||||
f'peer IPC channel closed abruptly?\n\n'
|
f'peer IPC channel closed abruptly?\n'
|
||||||
f'<=x {chan}\n'
|
f'\n'
|
||||||
|
f'<=x[\n'
|
||||||
|
f' {chan}\n'
|
||||||
f' |_{chan.raddr}\n\n'
|
f' |_{chan.raddr}\n\n'
|
||||||
)
|
)
|
||||||
+
|
+
|
||||||
|
|
|
@ -437,22 +437,23 @@ class MsgStream(trio.abc.Channel):
|
||||||
message: str = (
|
message: str = (
|
||||||
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
|
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
|
||||||
# } bc a stream is a "scope"/msging-phase inside an IPC
|
# } bc a stream is a "scope"/msging-phase inside an IPC
|
||||||
f'x}}>\n'
|
f'c}}>\n'
|
||||||
f' |_{self}\n'
|
f' |_{self}\n'
|
||||||
)
|
)
|
||||||
log.cancel(message)
|
|
||||||
self._eoc = trio.EndOfChannel(message)
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
(rx_chan := self._rx_chan)
|
(rx_chan := self._rx_chan)
|
||||||
and
|
and
|
||||||
(stats := rx_chan.statistics()).tasks_waiting_receive
|
(stats := rx_chan.statistics()).tasks_waiting_receive
|
||||||
):
|
):
|
||||||
log.cancel(
|
message += (
|
||||||
f'Msg-stream is closing but there is still reader tasks,\n'
|
f'AND there is still reader tasks,\n'
|
||||||
|
f'\n'
|
||||||
f'{stats}\n'
|
f'{stats}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.cancel(message)
|
||||||
|
self._eoc = trio.EndOfChannel(message)
|
||||||
|
|
||||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||||
# => NO, DEFINITELY NOT! <=
|
# => NO, DEFINITELY NOT! <=
|
||||||
# if we're a bi-dir `MsgStream` BECAUSE this same
|
# if we're a bi-dir `MsgStream` BECAUSE this same
|
||||||
|
@ -811,13 +812,12 @@ async def open_stream_from_ctx(
|
||||||
# sanity, can remove?
|
# sanity, can remove?
|
||||||
assert eoc is stream._eoc
|
assert eoc is stream._eoc
|
||||||
|
|
||||||
log.warning(
|
log.runtime(
|
||||||
'Stream was terminated by EoC\n\n'
|
'Stream was terminated by EoC\n\n'
|
||||||
# NOTE: won't show the error <Type> but
|
# NOTE: won't show the error <Type> but
|
||||||
# does show txt followed by IPC msg.
|
# does show txt followed by IPC msg.
|
||||||
f'{str(eoc)}\n'
|
f'{str(eoc)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if ctx._portal:
|
if ctx._portal:
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue