From a89799b6821a396f8ade033e56fe82205a7b27c6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Nov 2022 19:22:33 -0500 Subject: [PATCH] Handle broken mem chan on `Actor._push_result()` When backpressure is used and a feeder mem chan breaks during msg delivery (usually because the IPC allocating task already terminated) instead of raising we simply warn as we do for the non-backpressure case. Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid doing an arbiter-registry lookup if the current actor **is** the registrar. --- tractor/_runtime.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 4e1083f..c989c97 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -829,7 +829,12 @@ class Actor: if ctx._backpressure: log.warning(text) - await send_chan.send(msg) + try: + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") else: try: raise StreamOverrun(text) from None @@ -1374,8 +1379,9 @@ async def async_main( actor.lifetime_stack.close() # Unregister actor from the arbiter - if registered_with_arbiter and ( - actor._arb_addr is not None + if ( + registered_with_arbiter + and not actor.is_arbiter ): failed = False with trio.move_on_after(0.5) as cs: