From 389c746223b93bd1ee46956d8427c2aa5e5f9d36 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:05:11 -0500 Subject: [PATCH] Catch using `Sampler.bcast_errors` where possible In all other possible IPC disconnect handling blocks. Also more comprehensive typing throughout `uniform_rate_send()`. --- piker/data/_sampling.py | 59 ++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 0bb9a247..e5b87a2a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -771,18 +771,14 @@ async def sample_and_broadcast( if lags > 10: await tractor.pause() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, - ): + except Sampler.bcast_errors as ipc_err: ctx: Context = ipc._ctx chan: Channel = ctx.chan if ctx: log.warning( - 'Dropped `brokerd`-quotes-feed connection:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' + f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n' + f'x>) {ctx.cid}@{chan.uid}' + f'|_{ipc_err!r}\n\n' ) if sub.throttle_rate: assert ipc._closed @@ -803,7 +799,7 @@ async def uniform_rate_send( quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -821,13 +817,16 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # TODO: compute the approx overhead latency per cycle - left_to_sleep = throttle_period = 1/rate - 0.000616 + # ?TODO? dynamically compute the **actual** approx overhead latency per cycle + # instead of this magic # bidinezz? + throttle_period: float = 1/rate - 0.000616 + left_to_sleep: float = throttle_period # send cycle state + first_quote: dict|None first_quote = last_quote = None - last_send = time.time() - diff = 0 + last_send: float = time.time() + diff: float = 0 task_status.started() ticks_by_type: dict[ @@ -839,20 +838,27 @@ async def uniform_rate_send( while True: # compute the remaining time to sleep for this throttled cycle - left_to_sleep = throttle_period - diff + left_to_sleep: float = throttle_period - diff if left_to_sleep > 0: + cs: trio.CancelScope with trio.move_on_after(left_to_sleep) as cs: + sym: str + last_quote: dict try: sym, last_quote = await quote_stream.receive() except trio.EndOfChannel: - log.exception(f"feed for {stream} ended?") + log.exception( + f'Live stream for feed for ended?\n' + f'<=c\n' + f' |_[{stream!r}\n' + ) break - diff = time.time() - last_send + diff: float = time.time() - last_send if not first_quote: - first_quote = last_quote + first_quote: float = last_quote # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: @@ -913,7 +919,9 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({sym: first_quote}) + await stream.send({ + sym: first_quote + }) except tractor.RemoteActorError as rme: if rme.type is not tractor._exceptions.StreamOverrun: raise @@ -924,17 +932,14 @@ async def uniform_rate_send( f'{sym}:{ctx.cid}@{chan.uid}' ) + # NOTE: any of these can be raised by `tractor`'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! except ( - # NOTE: any of these can be raised by ``tractor``'s IPC - # transport-layer and we want to be highly resilient - # to consumers which crash or lose network connection. - # I.e. we **DO NOT** want to crash and propagate up to - # ``pikerd`` these kinds of errors! - trio.ClosedResourceError, - trio.BrokenResourceError, - trio.EndOfChannel, ConnectionResetError, - ) as ipc_err: + ) + Sampler.bcast_errors as ipc_err: match ipc_err: case trio.EndOfChannel(): log.info(