Compare commits

..

No commits in common. "2aed7dd5a451eb7912a7fa48602fc647678ca71e" and "588b7ca7bf32c326d87f7478d78cbde5f1754864" have entirely different histories.

4 changed files with 15 additions and 31 deletions

View File

@ -243,6 +243,7 @@ async def open_root_actor(
logger.cancel("Shutting down root actor") logger.cancel("Shutting down root actor")
await actor.cancel() await actor.cancel()
finally: finally:
_state._current_actor = None
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")

View File

@ -826,12 +826,7 @@ class Actor:
if ctx._backpressure: if ctx._backpressure:
log.warning(text) log.warning(text)
try:
await send_chan.send(msg) await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else: else:
try: try:
raise StreamOverrun(text) from None raise StreamOverrun(text) from None
@ -1376,9 +1371,8 @@ async def async_main(
actor.lifetime_stack.close() actor.lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if ( if registered_with_arbiter and (
registered_with_arbiter actor._arb_addr is not None
and not actor.is_arbiter
): ):
failed = False failed = False
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:

View File

@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
''' '''
def __init__( def __init__(
self, self,
ctx: Context, # typing: ignore # noqa ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None, _broadcaster: Optional[BroadcastReceiver] = None,
@ -82,9 +82,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self._eoc: bool = False self._eoc: bool = False
self._closed: bool = False self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
@ -274,8 +271,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
''' '''Allocate and return a ``BroadcastReceiver`` which delegates
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -384,7 +380,7 @@ class Context:
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = True _backpressure: bool = False
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -613,14 +609,7 @@ class Context:
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end. # far end.
try:
await self.send_stop() await self.send_stop()
except trio.BrokenResourceError:
log.warning(
f"Couldn't close: stream already broken?\n"
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
finally: finally:
if self._portal: if self._portal:

View File

@ -133,12 +133,12 @@ async def gather_contexts(
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
try: # NOTE: order *should* be preserved in the output values
# since ``dict``s are now implicitly ordered.
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid # we don't need a try/finally since cancellation will be triggered
# the following wacky bug: # by the surrounding nursery on error.
# <tractorbugurlhere>
parent_exit.set() parent_exit.set()