forked from goodboy/tractor
1
0
Fork 0

Add `Context.peer_side: str` property, mk static-meth private.

runtime_to_msgspec
Tyler Goodlet 2024-04-25 12:38:05 -04:00
parent c25c77c573
commit 4bab998ff9
1 changed files with 15 additions and 15 deletions

View File

@ -472,13 +472,17 @@ class Context:
return 'parent' if self._portal else 'child' return 'parent' if self._portal else 'child'
@staticmethod @staticmethod
def peer_side(side: str) -> str: def _peer_side(side: str) -> str:
match side: match side:
case 'child': case 'child':
return 'parent' return 'parent'
case 'parent': case 'parent':
return 'child' return 'child'
@property
def peer_side(self) -> str:
return self._peer_side(self.side)
# TODO: remove stat! # TODO: remove stat!
# -[ ] re-implement the `.experiemental._pubsub` stuff # -[ ] re-implement the `.experiemental._pubsub` stuff
# with `MsgStream` and that should be last usage? # with `MsgStream` and that should be last usage?
@ -512,9 +516,7 @@ class Context:
equiv of a `StopIteration`. equiv of a `StopIteration`.
''' '''
await self.chan.send( await self.chan.send(Stop(cid=self.cid))
Stop(cid=self.cid)
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
@ -593,7 +595,6 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -601,7 +602,7 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n' f'=> {self.side!r}\n\n'
f'{error}' f'{error}'
) )
@ -623,9 +624,8 @@ class Context:
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgerr = True
peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
@ -1067,12 +1067,12 @@ class Context:
except trio.EndOfChannel as eoc: except trio.EndOfChannel as eoc:
if ( if (
eoc eoc
and stream.closed and
stream.closed
): ):
# sanity, can remove? # sanity, can remove?
assert eoc is stream._eoc assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but # NOTE: won't show the error <Type> but
@ -1644,10 +1644,9 @@ class Context:
side: str = self.side side: str = self.side
if side == 'child': if side == 'child':
assert not self._portal assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = ( flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n' f'<= peer {self.peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n' f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n' f'=> {side!r}: {self._task}\n'
@ -1665,7 +1664,7 @@ class Context:
log_meth = log.runtime log_meth = log.runtime
log_meth( log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'{flow_body}'
@ -2330,7 +2329,7 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
'Context cancelled by caller task\n' 'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n' f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
@ -2364,6 +2363,7 @@ async def open_context_from_portal(
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,