`.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the `OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str` as is required for matching on the `piker.data.feed` side. So change to that and simplify the actual `.bs_fqme: str` value to NOT include the ISO-format time (for now) since it's a big ugly and longer term we need a `piker`-fqme friendly-on-ze-eyes format/style anyway..fix_deribit_hist_queries_NEW
							parent
							
								
									ab9789d35e
								
							
						
					
					
						commit
						5aad43378c
					
				|  | @ -29,6 +29,7 @@ from typing import ( | |||
| # from pprint import pformat | ||||
| import time | ||||
| 
 | ||||
| import cryptofeed | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from pendulum import ( | ||||
|  | @ -52,19 +53,10 @@ from piker._cacheables import ( | |||
| ) | ||||
| from piker.log import ( | ||||
|     get_logger, | ||||
|     mk_repr, | ||||
| ) | ||||
| from piker.data.validate import FeedInit | ||||
| 
 | ||||
| # from cryptofeed import FeedHandler | ||||
| # from cryptofeed.defines import ( | ||||
| #     DERIBIT, | ||||
| #     L1_BOOK, | ||||
| #     TRADES, | ||||
| #     OPTION, | ||||
| #     CALL, | ||||
| #     PUT, | ||||
| # ) | ||||
| # from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| from .api import ( | ||||
|     Client, | ||||
|  | @ -219,51 +211,64 @@ async def get_mkt_info( | |||
|             price_tick=pair.price_tick, | ||||
|             size_tick=pair.size_tick, | ||||
|             bs_mktid=pair.symbol, | ||||
|             expiry=pair.expiry, | ||||
|             venue=mkt_mode, | ||||
|             broker='deribit', | ||||
|             _atype=mkt_mode, | ||||
|             _fqme_without_src=True, | ||||
| 
 | ||||
|             # expiry=pair.expiry, | ||||
|             # XXX TODO, currently we don't use it since it's | ||||
|             # already "described" in the `OptionPair.symbol: str` | ||||
|             # and if we slap in the ISO repr it's kinda hideous.. | ||||
|             # -[ ] figure out the best either std | ||||
|         ) | ||||
|         return mkt, pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
| 
 | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Open a live quote stream for the market set defined by `symbols`. | ||||
| 
 | ||||
|     ''' | ||||
|     sym = symbols[0].split('.')[0] | ||||
| 
 | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr() | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         mkt: MktPair | ||||
|         pair: Pair | ||||
|         mkt, pair = await get_mkt_info(sym) | ||||
| 
 | ||||
|         # build out init msgs according to latest spec | ||||
|         init_msgs.append( | ||||
|             FeedInit(mkt_info=mkt) | ||||
|             FeedInit( | ||||
|                 mkt_info=mkt, | ||||
|             ) | ||||
|         ) | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
|         # build `cryptofeed` feed-handle | ||||
|         cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
| 
 | ||||
|             # TODO, uhh use it ?? XD | ||||
|             # cache = client._pairs | ||||
| 
 | ||||
|             last_trades = (await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||
|             last_trades = ( | ||||
|                 await client.last_trades( | ||||
|                     cb_sym_to_deribit_inst(cf_sym), | ||||
|                     count=1, | ||||
|                 ) | ||||
|             ).trades | ||||
| 
 | ||||
|             if len(last_trades) == 0: | ||||
|                 last_trade = None | ||||
|  | @ -286,16 +291,25 @@ async def stream_quotes( | |||
|                     'broker_ts': last_trade.timestamp | ||||
|                 }] | ||||
|             } | ||||
|             task_status.started((init_msgs,  first_quote)) | ||||
|             task_status.started(( | ||||
|                 init_msgs, | ||||
|                 first_quote, | ||||
|             )) | ||||
| 
 | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             # NOTE XXX, static for now! | ||||
|             # => since this only handles ONE mkt feed at a time we | ||||
|             # don't need a lookup table to map interleaved quotes | ||||
|             # from multiple possible mkt-pairs | ||||
|             topic: str = mkt.bs_fqme | ||||
| 
 | ||||
|             # deliver until cancelled | ||||
|             async for typ, quote in stream: | ||||
|                 topic: str = quote['symbol'] | ||||
|                 sym: str = quote['symbol'] | ||||
|                 log.info( | ||||
|                     f'deribit {typ!r} quote\n\n' | ||||
|                     f'{quote}\n' | ||||
|                     f'deribit {typ!r} quote for {sym!r}\n\n' | ||||
|                     f'{pfmt(quote)}\n' | ||||
|                 ) | ||||
|                 await send_chan.send({ | ||||
|                     topic: quote, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue