Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet 2aed7dd5a4 Always set the `parent_exit: trio.Event` on exit 2022-12-12 15:17:21 -05:00
Tyler Goodlet 304ee6ccd2 Enable stream backpressure by default, add `MsgStream.ctx: Context` 2022-12-12 15:17:21 -05:00
Tyler Goodlet 5dc07f0455 Don't unset actor global on root teardown 2022-12-12 15:17:21 -05:00
Tyler Goodlet f7b66ba3cb Don't raise on a broken IPC-context when sending stop msg 2022-12-12 15:17:21 -05:00
Tyler Goodlet ad78366f7d Handle broken mem chan on `Actor._push_result()`
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
2022-12-12 15:17:21 -05:00
4 changed files with 31 additions and 15 deletions

View File

@ -243,7 +243,6 @@ async def open_root_actor(
logger.cancel("Shutting down root actor")
await actor.cancel()
finally:
_state._current_actor = None
logger.runtime("Root actor terminated")

View File

@ -826,7 +826,12 @@ class Actor:
if ctx._backpressure:
log.warning(text)
try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else:
try:
raise StreamOverrun(text) from None
@ -1371,8 +1376,9 @@ async def async_main(
actor.lifetime_stack.close()
# Unregister actor from the arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
if (
registered_with_arbiter
and not actor.is_arbiter
):
failed = False
with trio.move_on_after(0.5) as cs:

View File

@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
'''
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None,
@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self._eoc: bool = False
self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
@ -271,7 +274,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self,
) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -380,7 +384,7 @@ class Context:
# only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False
_backpressure: bool = True
async def send_yield(self, data: Any) -> None:
@ -609,7 +613,14 @@ class Context:
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
try:
await self.send_stop()
except trio.BrokenResourceError:
log.warning(
f"Couldn't close: stream already broken?\n"
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
finally:
if self._portal:

View File

@ -133,12 +133,12 @@ async def gather_contexts(
# deliver control once all managers have started up
await all_entered.wait()
# NOTE: order *should* be preserved in the output values
# since ``dict``s are now implicitly ordered.
try:
yield tuple(unwrapped.values())
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()