forked from goodboy/tractor
1
0
Fork 0

Let `MsgStream.receive_nowait()` take in msg key list

Call it `allow_msg_keys: list[str] = ['yield']` and set it to accept
`['yield', 'return']` from the drain loop in `.aclose()`. Only pass the
last key error to `_raise_from_no_key_in_msg()` in the fall-through
case.

Somehow this seems to prevent all the intermittent test failures i was
seeing in local runs including when running the entire suite all in
sequence; i ain't complaining B)
modden_spawn_from_client_req
Tyler Goodlet 2024-03-11 10:20:55 -04:00
parent f067cf48a7
commit 37ee477aee
1 changed files with 26 additions and 14 deletions

View File

@ -90,19 +90,29 @@ class MsgStream(trio.abc.Channel):
self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
def receive_nowait(
self,
allow_msg_keys: list[str] = ['yield'],
):
msg: dict = self._rx_chan.receive_nowait()
for (
i,
key,
) in enumerate(allow_msg_keys):
try:
return msg[key]
except KeyError as kerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key=key,
stream=self,
)
async def receive(self):
'''
@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel):
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait()
maybe_final_msg = self.receive_nowait(
allow_msg_keys=['yield', 'return'],
)
if maybe_final_msg:
log.debug(
'Drained un-processed stream msg:\n'