diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index acd75d6c..d1b2aac5 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -51,6 +51,25 @@ class NoData(BrokerError): self.frame_size: int = 1000 +class DataUnavailable(BrokerError): + ''' + Signal storage requests to terminate. + + ''' + # TODO: add in a reason that can be displayed in the + # UI (for eg. `kraken` is bs and you should complain + # to them that you can't pull more OHLC data..) + + +class DataThrottle(BrokerError): + ''' + Broker throttled request rate for data. + + ''' + # TODO: add in throttle metrics/feedback + + + def resproc( resp: asks.response_objects.Response, log: logging.Logger, diff --git a/piker/data/feed.py b/piker/data/feed.py index 0d0156b6..b00cf70e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -67,6 +67,10 @@ from ._sampling import ( sample_and_broadcast, uniform_rate_send, ) +from ..brokers._util import ( + NoData, + DataUnavailable, +) log = get_logger(__name__) @@ -273,7 +277,19 @@ async def start_backfill( # and count < mx_fills ): count += 1 - array, start_dt, end_dt = await hist(end_dt=start_dt) + try: + array, start_dt, end_dt = await hist(end_dt=start_dt) + + except NoData: + # decrement by the diff in time last delivered. + end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds) + continue + + except DataUnavailable: + # broker is being a bish and we can't pull + # any more.. + break + to_push = diff_history( array, start_dt,