forked from goodboy/tractor
1
0
Fork 0

Use `_raise_from_no_key_in_msg(allow_msgs)`

Instead of `allow_msg_keys` since we've fully flipped over to
struct-types for msgs in the runtime.

- drop the loop from `MsgStream.receive_nowait()` since
  `Yield/Return.pld` getting will handle both (instead of a loop of
  `dict`-key reads).
runtime_to_msgspec
Tyler Goodlet 2024-04-14 18:31:41 -04:00
parent f72b972348
commit 38a6483859
3 changed files with 19 additions and 33 deletions

View File

@ -2151,7 +2151,6 @@ async def open_context_from_portal(
src_err=src_error,
log=log,
expect_msg=Started,
# expect_key='started',
)
uid: tuple = portal.channel.uid

View File

@ -938,7 +938,6 @@ def _raise_from_no_key_in_msg(
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
expect_msg: str = Yield,
stream: MsgStream | None = None,
@ -1053,7 +1052,7 @@ def _raise_from_no_key_in_msg(
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f"{_type} was expecting a '{expect_key.upper()}' message"
f"{_type} was expecting a {expect_msg} message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
) from src_err

View File

@ -44,6 +44,7 @@ from .trionics import (
BroadcastReceiver,
)
from tractor.msg import (
Return,
Stop,
Yield,
)
@ -82,7 +83,7 @@ class MsgStream(trio.abc.Channel):
self,
ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
_broadcaster: BroadcastReceiver | None = None,
_broadcaster: BroadcastReceiver|None = None,
) -> None:
self._ctx = ctx
@ -96,36 +97,26 @@ class MsgStream(trio.abc.Channel):
# delegate directly to underlying mem channel
def receive_nowait(
self,
allow_msg_keys: list[str] = ['yield'],
allow_msgs: list[str] = Yield,
):
# msg: dict = self._rx_chan.receive_nowait()
msg: Yield|Stop = self._rx_chan.receive_nowait()
for (
i,
key,
) in enumerate(allow_msg_keys):
# TODO: replace msg equiv of this or does the `.pld`
# interface read already satisfy it? I think so, yes?
try:
# return msg[key]
return msg.pld
# except KeyError as kerr:
except AttributeError as attrerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
# src_err=kerr,
src_err=attrerr,
log=log,
expect_key=key,
stream=self,
)
async def receive(
self,
hide_tb: bool = True,
hide_tb: bool = False,
):
'''
Receive a single msg from the IPC transport, the next in
@ -157,10 +148,9 @@ class MsgStream(trio.abc.Channel):
try:
try:
msg: Yield = await self._rx_chan.receive()
# return msg['yield']
return msg.pld
# except KeyError as kerr:
# TODO: implement with match: instead?
except AttributeError as attrerr:
# src_err = kerr
src_err = attrerr
@ -170,10 +160,8 @@ class MsgStream(trio.abc.Channel):
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
# src_err=kerr,
src_err=attrerr,
log=log,
expect_key='yield',
stream=self,
)
@ -304,7 +292,7 @@ class MsgStream(trio.abc.Channel):
while not drained:
try:
maybe_final_msg = self.receive_nowait(
allow_msg_keys=['yield', 'return'],
allow_msgs=[Yield, Return],
)
if maybe_final_msg:
log.debug(