Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet 2aed7dd5a4 Always set the `parent_exit: trio.Event` on exit 2022-12-12 15:17:21 -05:00
Tyler Goodlet 304ee6ccd2 Enable stream backpressure by default, add `MsgStream.ctx: Context` 2022-12-12 15:17:21 -05:00
Tyler Goodlet 5dc07f0455 Don't unset actor global on root teardown 2022-12-12 15:17:21 -05:00
Tyler Goodlet f7b66ba3cb Don't raise on a broken IPC-context when sending stop msg 2022-12-12 15:17:21 -05:00
Tyler Goodlet ad78366f7d Handle broken mem chan on `Actor._push_result()`
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
2022-12-12 15:17:21 -05:00
4 changed files with 31 additions and 15 deletions

View File

@ -243,7 +243,6 @@ async def open_root_actor(
logger.cancel("Shutting down root actor") logger.cancel("Shutting down root actor")
await actor.cancel() await actor.cancel()
finally: finally:
_state._current_actor = None
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")

View File

@ -826,7 +826,12 @@ class Actor:
if ctx._backpressure: if ctx._backpressure:
log.warning(text) log.warning(text)
await send_chan.send(msg) try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else: else:
try: try:
raise StreamOverrun(text) from None raise StreamOverrun(text) from None
@ -1371,8 +1376,9 @@ async def async_main(
actor.lifetime_stack.close() actor.lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if registered_with_arbiter and ( if (
actor._arb_addr is not None registered_with_arbiter
and not actor.is_arbiter
): ):
failed = False failed = False
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:

View File

@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
''' '''
def __init__( def __init__(
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None, _broadcaster: Optional[BroadcastReceiver] = None,
@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self._eoc: bool = False self._eoc: bool = False
self._closed: bool = False self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
@ -271,7 +274,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates '''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -380,7 +384,7 @@ class Context:
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False _backpressure: bool = True
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -609,7 +613,14 @@ class Context:
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end. # far end.
await self.send_stop() try:
await self.send_stop()
except trio.BrokenResourceError:
log.warning(
f"Couldn't close: stream already broken?\n"
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
finally: finally:
if self._portal: if self._portal:

View File

@ -133,13 +133,13 @@ async def gather_contexts(
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
# NOTE: order *should* be preserved in the output values try:
# since ``dict``s are now implicitly ordered. yield tuple(unwrapped.values())
yield tuple(unwrapped.values()) finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# we don't need a try/finally since cancellation will be triggered # the following wacky bug:
# by the surrounding nursery on error. # <tractorbugurlhere>
parent_exit.set() parent_exit.set()
# Per actor task caching helpers. # Per actor task caching helpers.