diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index b3a202ba..28c1cbed 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -29,6 +29,7 @@ from typing import ( # from pprint import pformat import time +import cryptofeed import trio from trio_typing import TaskStatus from pendulum import ( @@ -52,19 +53,10 @@ from piker._cacheables import ( ) from piker.log import ( get_logger, + mk_repr, ) from piker.data.validate import FeedInit -# from cryptofeed import FeedHandler -# from cryptofeed.defines import ( -# DERIBIT, -# L1_BOOK, -# TRADES, -# OPTION, -# CALL, -# PUT, -# ) -# from cryptofeed.symbols import Symbol from .api import ( Client, @@ -219,51 +211,64 @@ async def get_mkt_info( price_tick=pair.price_tick, size_tick=pair.size_tick, bs_mktid=pair.symbol, - expiry=pair.expiry, venue=mkt_mode, broker='deribit', _atype=mkt_mode, _fqme_without_src=True, + + # expiry=pair.expiry, + # XXX TODO, currently we don't use it since it's + # already "described" in the `OptionPair.symbol: str` + # and if we slap in the ISO repr it's kinda hideous.. + # -[ ] figure out the best either std ) return mkt, pair async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Open a live quote stream for the market set defined by `symbols`. + ''' sym = symbols[0].split('.')[0] - init_msgs: list[FeedInit] = [] + # multiline nested `dict` formatter (since rn quote-msgs are + # just that). + pfmt: Callable[[str], str] = mk_repr() + async with ( open_cached_client('deribit') as client, send_chan as send_chan ): - + mkt: MktPair + pair: Pair mkt, pair = await get_mkt_info(sym) # build out init msgs according to latest spec init_msgs.append( - FeedInit(mkt_info=mkt) + FeedInit( + mkt_info=mkt, + ) ) - nsym = piker_sym_to_cb_sym(sym) + # build `cryptofeed` feed-handle + cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) async with maybe_open_price_feed(sym) as stream: - - # TODO, uhh use it ?? XD - # cache = client._pairs - - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades + last_trades = ( + await client.last_trades( + cb_sym_to_deribit_inst(cf_sym), + count=1, + ) + ).trades if len(last_trades) == 0: last_trade = None @@ -286,16 +291,25 @@ async def stream_quotes( 'broker_ts': last_trade.timestamp }] } - task_status.started((init_msgs, first_quote)) + task_status.started(( + init_msgs, + first_quote, + )) feed_is_live.set() + # NOTE XXX, static for now! + # => since this only handles ONE mkt feed at a time we + # don't need a lookup table to map interleaved quotes + # from multiple possible mkt-pairs + topic: str = mkt.bs_fqme + # deliver until cancelled async for typ, quote in stream: - topic: str = quote['symbol'] + sym: str = quote['symbol'] log.info( - f'deribit {typ!r} quote\n\n' - f'{quote}\n' + f'deribit {typ!r} quote for {sym!r}\n\n' + f'{pfmt(quote)}\n' ) await send_chan.send({ topic: quote,