Add `MsgStream._stop_msg` use new `PldRx` API

In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from
`.receive_nowait()` (which is called from `.aclose()`) such that we
ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs
on reception of a final `Return`!!

This fixes a final stream-teardown-race-condition-bug where prior we
normally didn't set the `Context._result/._outcome_msg` in such cases.
This is **precisely because**  `.receive_nowait()` only returns the
`pld` and when called from `.aclose()` this value is discarded, meaning
so is its boxing `Return` despite consuming it from the underlying
`._rx_chan`..

Longer term this should be solved differently by ensuring such races
cases are handled at a higher scope like inside `Context._deliver_msg()`
or the `Portal.open_context()` enter/exit blocks? Add a detailed warning
note and todos for all this around the special case block!
Tyler Goodlet 2025-03-12 16:24:39 -04:00
parent 01b5955cce
commit 51f72801d2
1 changed files with 95 additions and 23 deletions

View File

@ -45,9 +45,11 @@ from .trionics import (
BroadcastReceiver,
)
from tractor.msg import (
# Return,
# Stop,
Error,
Return,
Stop,
MsgType,
PayloadT,
Yield,
)
@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel):
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules:
@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel):
self._rx_chan = rx_chan
self._broadcaster = _broadcaster
# any actual IPC msg which is effectively an `EndOfStream`
self._stop_msg: bool|Stop = False
# flag to denote end of stream
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel):
def receive_nowait(
self,
expect_msg: MsgType = Yield,
):
) -> PayloadT:
ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait(
(
msg,
pld,
) = ctx._pld_rx.recv_msg_nowait(
ipc=self,
expect_msg=expect_msg,
)
# ?TODO, maybe factor this into a hyper-common `unwrap_pld()`
#
match msg:
# XXX, these never seems to ever hit? cool?
case Stop():
log.cancel(
f'Msg-stream was ended via stop msg\n'
f'{msg}'
)
case Error():
log.error(
f'Msg-stream was ended via error msg\n'
f'{msg}'
)
# XXX NOTE, always set any final result on the ctx to
# avoid teardown race conditions where previously this msg
# would be consumed silently (by `.aclose()` doing its
# own "msg drain loop" but WITHOUT those `drained: lists[MsgType]`
# being post-close-processed!
#
# !!TODO, see the equiv todo-comment in `.receive()`
# around the `if drained:` where we should prolly
# ACTUALLY be doing this post-close processing??
#
case Return(pld=pld):
log.warning(
f'Msg-stream final result msg for IPC ctx?\n'
f'{msg}'
)
# XXX TODO, this **should be covered** by higher
# scoped runtime-side method calls such as
# `Context._deliver_msg()`, so you should never
# really see the warning above or else something
# racy/out-of-order is likely going on between
# actor-runtime-side push tasks and the user-app-side
# consume tasks!
# -[ ] figure out that set of race cases and fix!
# -[ ] possibly return the `msg` given an input
# arg-flag is set so we can process the `Return`
# from the `.aclose()` caller?
#
# breakpoint() # to debug this RACE CASE!
ctx._result = pld
ctx._outcome_msg = msg
return pld
async def receive(
self,
hide_tb: bool = False,
):
'''
@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel):
# except trio.EndOfChannel:
# raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to
# see `.aclose()` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise self._eoc
@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self)
pld = await ctx._pld_rx.recv_pld(
ipc=self,
expect_msg=Yield,
)
return pld
# XXX: the stream terminates on either of:
# - `self._rx_chan.receive()` raising after manual closure
@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel):
# - via a `Stop`-msg received from remote peer task.
# NOTE
# |_ previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel
# `._rx_chan.aclose()` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()`
# internals.
@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose()
if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.wait_for_result()` call?
#
# from .devx import pause
# await pause()
# ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs:
# deque` and then iterate them as part of any
# `.wait_for_result()` call?
#
# -[ ] move the match-case processing from
# `.receive_nowait()` instead to right here, use it from
# a for msg in drained:` post-proc loop?
#
log.warning(
'Drained context msgs during closure\n\n'
f'{drained}'
@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel):
- more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
'''
# rx_chan = self._rx_chan
# XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on
@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel):
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
# import tractor
# await tractor.pause()
return []
ctx: Context = self._ctx
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait(
# allow_msgs=[Yield, Return],
expect_msg=Yield,
maybe_final_msg: Yield|Return = self.receive_nowait(
expect_msg=Yield|Return,
)
if maybe_final_msg:
log.debug(
@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose()
if not self._eoc:
this_side: str = self._ctx.side
peer_side: str = self._ctx.peer_side
message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f' |_{self}\n'
@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel):
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same
# if we're a bi-dir `MsgStream` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that