forked from goodboy/tractor
Compare commits
5 Commits
master
...
ipc_failwh
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 2aed7dd5a4 | |
Tyler Goodlet | 304ee6ccd2 | |
Tyler Goodlet | 5dc07f0455 | |
Tyler Goodlet | f7b66ba3cb | |
Tyler Goodlet | ad78366f7d |
|
@ -243,7 +243,6 @@ async def open_root_actor(
|
||||||
logger.cancel("Shutting down root actor")
|
logger.cancel("Shutting down root actor")
|
||||||
await actor.cancel()
|
await actor.cancel()
|
||||||
finally:
|
finally:
|
||||||
_state._current_actor = None
|
|
||||||
logger.runtime("Root actor terminated")
|
logger.runtime("Root actor terminated")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -826,7 +826,12 @@ class Actor:
|
||||||
|
|
||||||
if ctx._backpressure:
|
if ctx._backpressure:
|
||||||
log.warning(text)
|
log.warning(text)
|
||||||
|
try:
|
||||||
await send_chan.send(msg)
|
await send_chan.send(msg)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: local consumer has closed their side
|
||||||
|
# so cancel the far end streaming task
|
||||||
|
log.warning(f"{chan} is already closed")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
raise StreamOverrun(text) from None
|
raise StreamOverrun(text) from None
|
||||||
|
@ -1371,8 +1376,9 @@ async def async_main(
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
|
||||||
# Unregister actor from the arbiter
|
# Unregister actor from the arbiter
|
||||||
if registered_with_arbiter and (
|
if (
|
||||||
actor._arb_addr is not None
|
registered_with_arbiter
|
||||||
|
and not actor.is_arbiter
|
||||||
):
|
):
|
||||||
failed = False
|
failed = False
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
|
|
@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
ctx: 'Context', # typing: ignore # noqa
|
ctx: Context, # typing: ignore # noqa
|
||||||
rx_chan: trio.MemoryReceiveChannel,
|
rx_chan: trio.MemoryReceiveChannel,
|
||||||
_broadcaster: Optional[BroadcastReceiver] = None,
|
_broadcaster: Optional[BroadcastReceiver] = None,
|
||||||
|
|
||||||
|
@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
self._eoc: bool = False
|
self._eoc: bool = False
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
|
|
||||||
|
def ctx(self) -> Context:
|
||||||
|
return self._ctx
|
||||||
|
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(self):
|
def receive_nowait(self):
|
||||||
msg = self._rx_chan.receive_nowait()
|
msg = self._rx_chan.receive_nowait()
|
||||||
|
@ -271,7 +274,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> AsyncIterator[BroadcastReceiver]:
|
) -> AsyncIterator[BroadcastReceiver]:
|
||||||
'''Allocate and return a ``BroadcastReceiver`` which delegates
|
'''
|
||||||
|
Allocate and return a ``BroadcastReceiver`` which delegates
|
||||||
to this message stream.
|
to this message stream.
|
||||||
|
|
||||||
This allows multiple local tasks to receive each their own copy
|
This allows multiple local tasks to receive each their own copy
|
||||||
|
@ -380,7 +384,7 @@ class Context:
|
||||||
# only set on the callee side
|
# only set on the callee side
|
||||||
_scope_nursery: Optional[trio.Nursery] = None
|
_scope_nursery: Optional[trio.Nursery] = None
|
||||||
|
|
||||||
_backpressure: bool = False
|
_backpressure: bool = True
|
||||||
|
|
||||||
async def send_yield(self, data: Any) -> None:
|
async def send_yield(self, data: Any) -> None:
|
||||||
|
|
||||||
|
@ -609,7 +613,14 @@ class Context:
|
||||||
# XXX: Make the stream "one-shot use". On exit, signal
|
# XXX: Make the stream "one-shot use". On exit, signal
|
||||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
||||||
# far end.
|
# far end.
|
||||||
|
try:
|
||||||
await self.send_stop()
|
await self.send_stop()
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
log.warning(
|
||||||
|
f"Couldn't close: stream already broken?\n"
|
||||||
|
f'actor: {self.chan.uid}\n'
|
||||||
|
f'ctx id: {self.cid}'
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if self._portal:
|
if self._portal:
|
||||||
|
|
|
@ -133,12 +133,12 @@ async def gather_contexts(
|
||||||
# deliver control once all managers have started up
|
# deliver control once all managers have started up
|
||||||
await all_entered.wait()
|
await all_entered.wait()
|
||||||
|
|
||||||
# NOTE: order *should* be preserved in the output values
|
try:
|
||||||
# since ``dict``s are now implicitly ordered.
|
|
||||||
yield tuple(unwrapped.values())
|
yield tuple(unwrapped.values())
|
||||||
|
finally:
|
||||||
# we don't need a try/finally since cancellation will be triggered
|
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||||
# by the surrounding nursery on error.
|
# the following wacky bug:
|
||||||
|
# <tractorbugurlhere>
|
||||||
parent_exit.set()
|
parent_exit.set()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue