diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 7088011d..57fc0126 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -303,24 +303,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -419,14 +401,15 @@ async def stream_quotes( yield # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) + if ws.connected(): + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # XXX: do we need to ack the unsub? + # await ws.recv_msg() # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds