diff --git a/tractor/_actor.py b/tractor/_actor.py index 742d9ae..dac2105 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -663,6 +663,18 @@ class Actor: # consumer task await ctx._maybe_raise_from_remote_msg(msg) + except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") + return + except trio.WouldBlock: # XXX: always push an error even if the local # receiver is in overrun state. @@ -691,16 +703,11 @@ class Actor: err_msg = pack_error(err) err_msg['cid'] = cid try: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? await chan.send(err_msg) except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") + log.warning(f"{chan} is already closed") def get_context( self,