forked from goodboy/tractor
`trio.ClosedResourceError is deprecated
parent
fe1c4dbc4c
commit
02e0c0e1a4
|
@ -43,6 +43,9 @@ async def fan_out_to_ctxs(
|
||||||
for ctx in topics2ctxs.get(topic, set()):
|
for ctx in topics2ctxs.get(topic, set()):
|
||||||
ctx_payloads.setdefault(ctx, {}).update(packet),
|
ctx_payloads.setdefault(ctx, {}).update(packet),
|
||||||
|
|
||||||
|
if not ctx_payloads:
|
||||||
|
log.debug(f"Unconsumed values:\n{published}")
|
||||||
|
|
||||||
# deliver to each subscriber (fan out)
|
# deliver to each subscriber (fan out)
|
||||||
if ctx_payloads:
|
if ctx_payloads:
|
||||||
for ctx, payload in ctx_payloads.items():
|
for ctx, payload in ctx_payloads.items():
|
||||||
|
@ -50,7 +53,7 @@ async def fan_out_to_ctxs(
|
||||||
await ctx.send_yield(payload)
|
await ctx.send_yield(payload)
|
||||||
except (
|
except (
|
||||||
# That's right, anything you can think of...
|
# That's right, anything you can think of...
|
||||||
trio.ClosedStreamError, ConnectionResetError,
|
trio.ClosedResourceError, ConnectionResetError,
|
||||||
ConnectionRefusedError,
|
ConnectionRefusedError,
|
||||||
):
|
):
|
||||||
log.warning(f"{ctx.chan} went down?")
|
log.warning(f"{ctx.chan} went down?")
|
||||||
|
|
Loading…
Reference in New Issue