diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 10dc43f6..fda93e21 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -22,7 +22,7 @@ financial data flows. from __future__ import annotations from collections import Counter import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import tractor import trio @@ -32,6 +32,7 @@ from ..log import get_logger if TYPE_CHECKING: from ._sharedmem import ShmArray + from .feed import _FeedsBus log = get_logger(__name__) @@ -219,7 +220,7 @@ async def iter_ohlc_periods( async def sample_and_broadcast( - bus: '_FeedsBus', # noqa + bus: _FeedsBus, # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, brokername: str, @@ -298,7 +299,13 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[broker_symbol.lower()] + subs: list[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + tractor.Context, + Optional[float], # tick throttle in Hz + ] + ] = bus._subscribers[broker_symbol.lower()] # NOTE: by default the broker backend doesn't append # it's own "name" into the fqsn schema (but maybe it @@ -307,7 +314,7 @@ async def sample_and_broadcast( bsym = f'{broker_symbol}.{brokername}' lags: int = 0 - for (stream, tick_throttle) in subs: + for (stream, ctx, tick_throttle) in subs: try: with trio.move_on_after(0.2) as cs: @@ -319,11 +326,11 @@ async def sample_and_broadcast( (bsym, quote) ) except trio.WouldBlock: - ctx = getattr(stream, '_ctx', None) + chan = ctx.chan if ctx: log.warning( f'Feed overrun {bus.brokername} ->' - f'{ctx.channel.uid} !!!' + f'{chan.uid} !!!' ) else: key = id(stream) @@ -333,11 +340,26 @@ async def sample_and_broadcast( f'feed @ {tick_throttle} Hz' ) if overruns[key] > 6: - log.warning( - f'Dropping consumer {stream}' - ) - await stream.aclose() - raise trio.BrokenResourceError + # TODO: should we check for the + # context being cancelled? this + # could happen but the + # channel-ipc-pipe is still up. + if not chan.connected(): + log.warning( + 'Dropping broken consumer:\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' + ) + await stream.aclose() + raise trio.BrokenResourceError + else: + log.warning( + 'Feed getting overrun bro!\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' + ) + continue + else: await stream.send( {bsym: quote} @@ -482,6 +504,7 @@ async def uniform_rate_send( # if the feed consumer goes down then drop # out of this rate limiter log.warning(f'{stream} closed') + await stream.aclose() return # reset send cycle state diff --git a/piker/data/feed.py b/piker/data/feed.py index c49ab0fe..94c2f81d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -33,6 +33,7 @@ from typing import ( Generator, Awaitable, TYPE_CHECKING, + Union, ) import trio @@ -117,7 +118,13 @@ class _FeedsBus(BaseModel): # https://github.com/samuelcolvin/pydantic/issues/2816 _subscribers: dict[ str, - list[tuple[tractor.MsgStream, Optional[float]]] + list[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + tractor.Context, + Optional[float], # tick throttle in Hz + ] + ] ] = {} async def start_task( @@ -1118,10 +1125,10 @@ async def open_feed_bus( recv, stream, ) - sub = (send, tick_throttle) + sub = (send, ctx, tick_throttle) else: - sub = (stream, tick_throttle) + sub = (stream, ctx, tick_throttle) subs = bus._subscribers[bfqsn] subs.append(sub)