Handle broken mem chan on `Actor._push_result()`
When backpressure is used and a feeder mem chan breaks during msg delivery (usually because the IPC allocating task already terminated) instead of raising we simply warn as we do for the non-backpressure case. Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid doing an arbiter-registry lookup if the current actor **is** the registrar.ipc_failure_while_streaming
parent
9fd62cf71f
commit
4f977189c0
|
@ -826,7 +826,12 @@ class Actor:
|
|||
|
||||
if ctx._backpressure:
|
||||
log.warning(text)
|
||||
await send_chan.send(msg)
|
||||
try:
|
||||
await send_chan.send(msg)
|
||||
except trio.BrokenResourceError:
|
||||
# XXX: local consumer has closed their side
|
||||
# so cancel the far end streaming task
|
||||
log.warning(f"{chan} is already closed")
|
||||
else:
|
||||
try:
|
||||
raise StreamOverrun(text) from None
|
||||
|
@ -1371,8 +1376,9 @@ async def async_main(
|
|||
actor.lifetime_stack.close()
|
||||
|
||||
# Unregister actor from the arbiter
|
||||
if registered_with_arbiter and (
|
||||
actor._arb_addr is not None
|
||||
if (
|
||||
registered_with_arbiter
|
||||
and not actor.is_arbiter
|
||||
):
|
||||
failed = False
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
|
|
Loading…
Reference in New Issue